001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.apache.hadoop.hbase.IntegrationTestingUtility.DEFAULT_REGIONS_PER_SERVER; 021import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE; 022import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE_KEY; 023import static org.apache.hadoop.hbase.IntegrationTestingUtility.REGIONS_PER_SERVER_KEY; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025 026import java.io.DataInput; 027import java.io.DataOutput; 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.SortedSet; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.ThreadLocalRandom; 040import java.util.concurrent.atomic.AtomicInteger; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.conf.Configured; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.HRegionLocation; 051import org.apache.hadoop.hbase.IntegrationTestBase; 052import org.apache.hadoop.hbase.IntegrationTestingUtility; 053import org.apache.hadoop.hbase.MasterNotRunningException; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.client.BufferedMutator; 057import org.apache.hadoop.hbase.client.BufferedMutatorParams; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionConfiguration; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Mutation; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionLocator; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.client.TableDescriptor; 071import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 072import org.apache.hadoop.hbase.fs.HFileSystem; 073import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 074import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 075import org.apache.hadoop.hbase.mapreduce.TableMapper; 076import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 077import org.apache.hadoop.hbase.mapreduce.WALPlayer; 078import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 079import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 080import org.apache.hadoop.hbase.testclassification.IntegrationTests; 081import org.apache.hadoop.hbase.util.AbstractHBaseTool; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.CommonFSUtils; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.Random64; 086import org.apache.hadoop.hbase.util.RegionSplitter; 087import org.apache.hadoop.hbase.wal.WALEdit; 088import org.apache.hadoop.hbase.wal.WALKey; 089import org.apache.hadoop.io.BytesWritable; 090import org.apache.hadoop.io.NullWritable; 091import org.apache.hadoop.io.Writable; 092import org.apache.hadoop.mapreduce.Counter; 093import org.apache.hadoop.mapreduce.CounterGroup; 094import org.apache.hadoop.mapreduce.Counters; 095import org.apache.hadoop.mapreduce.InputFormat; 096import org.apache.hadoop.mapreduce.InputSplit; 097import org.apache.hadoop.mapreduce.Job; 098import org.apache.hadoop.mapreduce.JobContext; 099import org.apache.hadoop.mapreduce.Mapper; 100import org.apache.hadoop.mapreduce.RecordReader; 101import org.apache.hadoop.mapreduce.Reducer; 102import org.apache.hadoop.mapreduce.TaskAttemptContext; 103import org.apache.hadoop.mapreduce.TaskAttemptID; 104import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 105import org.apache.hadoop.mapreduce.lib.input.FileSplit; 106import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 107import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 108import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 109import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 110import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 111import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 112import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 113import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 114import org.apache.hadoop.util.Tool; 115import org.apache.hadoop.util.ToolRunner; 116import org.junit.jupiter.api.Tag; 117import org.junit.jupiter.api.Test; 118import org.slf4j.Logger; 119import org.slf4j.LoggerFactory; 120 121import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 122import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 123import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 124import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 125import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 126import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 127import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 128import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 129 130/** 131 * <p> 132 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn 133 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found 134 * here: 135 * <ul> 136 * <li>https://github.com/keith-turner/goraci</li> 137 * <li>https://github.com/enis/goraci/</li> 138 * </ul> 139 * </p> 140 * <p> 141 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This 142 * test suite is called continuous ingest. This test runs many ingest clients that continually 143 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map 144 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost. 145 * </p> 146 * <p> 147 * The nodes in the linked list are random. This causes each linked list to spread across the table. 148 * Therefore if one part of a table loses data, then it will be detected by references in another 149 * part of the table. 150 * </p> 151 * <p> 152 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific 153 * details look at the Generator code. 154 * </p> 155 * <p> 156 * <ol> 157 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li> 158 * <li>Flush the client</li> 159 * <li>Write out 1 million that reference previous million</li> 160 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last 161 * (25 is configurable; its the 'wrap multiplier' referred to below)</li> 162 * <li>goto 1</li> 163 * </ol> 164 * </p> 165 * <p> 166 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a 167 * missing node, even if the ingest client is killed at any point in time. 168 * </p> 169 * <p> 170 * When running this test suite w/ Accumulo there is a script running in parallel called the 171 * Aggitator that randomly and continuously kills server processes. The outcome was that many data 172 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that 173 * impact uptime and stability when run for days or weeks. 174 * </p> 175 * <p> 176 * This test suite consists the following 177 * <ul> 178 * <li>a few Java programs</li> 179 * <li>a little helper script to run the java programs</li> 180 * <li>a maven script to build it</li> 181 * </ul> 182 * </p> 183 * <p> 184 * When generating data, its best to have each map task generate a multiple of 25 million. The 185 * reason for this is that circular linked list are generated every 25M. Not generating a multiple 186 * in 25M will result in some nodes in the linked list not having references. The loss of an 187 * unreferenced node can not be detected. 188 * </p> 189 * <p> 190 * <h3>Below is a description of the Java programs</h3> 191 * <ul> 192 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to 193 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to 194 * select and walk random flushed loops during this phase.</li> 195 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running. 196 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not 197 * run at the same time as the Generator.</li> 198 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing 199 * info.</li> 200 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li> 201 * <li>{@code Delete} - A standalone program that deletes a single node</li> 202 * </ul> 203 * This class can be run as a unit test, as an integration test, or from the command line 204 * </p> 205 * <p> 206 * ex: 207 * 208 * <pre> 209 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 210 * loop 2 1 100000 /temp 1 1000 50 1 0 211 * </pre> 212 * </p> 213 */ 214@Tag(IntegrationTests.TAG) 215public class IntegrationTestBigLinkedList extends IntegrationTestBase { 216 protected static final byte[] NO_KEY = new byte[1]; 217 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 218 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 219 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 220 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 221 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 222 223 // link to the id of the prev node in the linked list 224 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 225 226 // identifier of the mapred task that generated this row 227 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 228 229 // the id of the row within the same client. 230 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 231 232 /** How many rows to write per map task. This has to be a multiple of 25M */ 233 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY = 234 "IntegrationTestBigLinkedList.generator.num_rows"; 235 236 private static final String GENERATOR_NUM_MAPPERS_KEY = 237 "IntegrationTestBigLinkedList.generator.map.tasks"; 238 239 private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width"; 240 241 private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap"; 242 243 private static final String CONCURRENT_WALKER_KEY = 244 "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 245 246 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 247 248 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 249 250 private static final int WIDTH_DEFAULT = 1000000; 251 252 /** 253 * The 'wrap multipler' default. 254 */ 255 private static final int WRAP_DEFAULT = 25; 256 private static final int ROWKEY_LENGTH = 16; 257 258 private static final int CONCURRENT_WALKER_DEFAULT = 0; 259 260 protected String toRun; 261 protected String[] otherArgs; 262 263 static class CINode { 264 byte[] key; 265 byte[] prev; 266 String client; 267 long count; 268 } 269 270 /** 271 * A Map only job that generates random linked list and stores them. 272 */ 273 static class Generator extends Configured implements Tool { 274 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 275 276 /** 277 * Set this configuration if you want to test single-column family flush works. If set, we will 278 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 279 * column family. When we write out the ITBLL, we will also add to the big column family a value 280 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 281 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 282 * way. Here is how you would pass it: 283 * <p> 284 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 285 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 286 */ 287 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 288 "generator.multiple.columnfamilies"; 289 290 /** 291 * Set this configuration if you want to scale up the size of test data quickly. 292 * <p> 293 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 294 * -Dgenerator.big.family.value.size=1024 generator 1 10 output 295 */ 296 public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; 297 298 public static enum GeneratorCounts { 299 SUCCESS, 300 TERMINATING, 301 UNDEFINED, 302 IOEXCEPTION 303 } 304 305 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() 306 + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" 307 + " <num walker threads>] \n" 308 + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n" 309 + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n" 310 + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n" 311 + "first written nodes back to the 25th set.\n" 312 + "Walkers verify random flushed loops during Generation."; 313 314 public Job job; 315 316 static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> { 317 static class GeneratorInputSplit extends InputSplit implements Writable { 318 @Override 319 public long getLength() throws IOException, InterruptedException { 320 return 1; 321 } 322 323 @Override 324 public String[] getLocations() throws IOException, InterruptedException { 325 return new String[0]; 326 } 327 328 @Override 329 public void readFields(DataInput arg0) throws IOException { 330 } 331 332 @Override 333 public void write(DataOutput arg0) throws IOException { 334 } 335 } 336 337 static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> { 338 private long count; 339 private long numNodes; 340 // Use Random64 to avoid issue described in HBASE-21256. 341 private Random64 rand = new Random64(); 342 343 @Override 344 public void close() throws IOException { 345 } 346 347 @Override 348 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 349 byte[] bytes = new byte[ROWKEY_LENGTH]; 350 rand.nextBytes(bytes); 351 return new BytesWritable(bytes); 352 } 353 354 @Override 355 public NullWritable getCurrentValue() throws IOException, InterruptedException { 356 return NullWritable.get(); 357 } 358 359 @Override 360 public float getProgress() throws IOException, InterruptedException { 361 return (float) (count / (double) numNodes); 362 } 363 364 @Override 365 public void initialize(InputSplit arg0, TaskAttemptContext context) 366 throws IOException, InterruptedException { 367 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 368 } 369 370 @Override 371 public boolean nextKeyValue() throws IOException, InterruptedException { 372 return count++ < numNodes; 373 } 374 } 375 376 @Override 377 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 378 TaskAttemptContext context) throws IOException, InterruptedException { 379 GeneratorRecordReader rr = new GeneratorRecordReader(); 380 rr.initialize(split, context); 381 return rr; 382 } 383 384 @Override 385 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 386 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 387 388 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 389 390 for (int i = 0; i < numMappers; i++) { 391 splits.add(new GeneratorInputSplit()); 392 } 393 394 return splits; 395 } 396 } 397 398 /** Ensure output files from prev-job go to map inputs for current job */ 399 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 400 @Override 401 protected boolean isSplitable(JobContext context, Path filename) { 402 return false; 403 } 404 } 405 406 /** 407 * Some ASCII art time: 408 * <p> 409 * [ . . . ] represents one batch of random longs of length WIDTH 410 * 411 * <pre> 412 * _________________________ 413 * | ______ | 414 * | | || 415 * .-+-----------------+-----.|| 416 * | | | ||| 417 * first = [ . . . . . . . . . . . ] ||| 418 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 419 * | | | | | | | | | | | ||| 420 * prev = [ . . . . . . . . . . . ] ||| 421 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 422 * | | | | | | | | | | | ||| 423 * current = [ . . . . . . . . . . . ] ||| 424 * ||| 425 * ... ||| 426 * ||| 427 * last = [ . . . . . . . . . . . ] ||| 428 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 429 * | |________|| 430 * |___________________________| 431 * </pre> 432 */ 433 434 static class GeneratorMapper 435 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 436 437 byte[][] first = null; 438 byte[][] prev = null; 439 byte[][] current = null; 440 byte[] id; 441 long count = 0; 442 int i; 443 BufferedMutator mutator; 444 Connection connection; 445 long numNodes; 446 long wrap; 447 int width; 448 boolean multipleUnevenColumnFamilies; 449 byte[] tinyValue = new byte[] { 't' }; 450 byte[] bigValue = null; 451 Configuration conf; 452 // Use Random64 to avoid issue described in HBASE-21256. 453 private Random64 rand = new Random64(); 454 455 volatile boolean walkersStop; 456 int numWalkers; 457 final Object flushedLoopsLock = new Object(); 458 volatile List<Long> flushedLoops = new ArrayList<>(); 459 List<Thread> walkers = new ArrayList<>(); 460 461 @Override 462 protected void setup(Context context) throws IOException, InterruptedException { 463 id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID()); 464 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 465 instantiateHTable(); 466 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 467 current = new byte[this.width][]; 468 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 469 this.wrap = (long) wrapMultiplier * width; 470 this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 471 (long) WIDTH_DEFAULT * WRAP_DEFAULT); 472 if (this.numNodes < this.wrap) { 473 this.wrap = this.numNodes; 474 } 475 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 476 this.numWalkers = 477 context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 478 this.walkersStop = false; 479 this.conf = context.getConfiguration(); 480 481 if (multipleUnevenColumnFamilies) { 482 int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256); 483 int limit = 484 context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 485 ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT); 486 487 Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n, 488 ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit); 489 490 bigValue = new byte[n]; 491 rand.nextBytes(bigValue); 492 LOG.info("Create a bigValue with " + n + " bytes."); 493 } 494 495 Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes); 496 Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0", 497 numNodes, width); 498 Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0", 499 numNodes, wrap); 500 } 501 502 protected void instantiateHTable() throws IOException { 503 mutator = connection 504 .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration())) 505 .writeBufferSize(4 * 1024 * 1024)); 506 } 507 508 @Override 509 protected void cleanup(Context context) throws IOException, InterruptedException { 510 joinWalkers(); 511 mutator.close(); 512 connection.close(); 513 } 514 515 @Override 516 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 517 current[i] = new byte[key.getLength()]; 518 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 519 if (++i == current.length) { 520 LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length, 521 count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i); 522 persist(output, count, prev, current, id); 523 i = 0; 524 525 if (first == null) { 526 first = current; 527 } 528 prev = current; 529 current = new byte[this.width][]; 530 531 count += current.length; 532 output.setStatus("Count " + count); 533 534 if (count % wrap == 0) { 535 // this block of code turns the 1 million linked list of length 25 into one giant 536 // circular linked list of 25 million 537 circularLeftShift(first); 538 persist(output, -1, prev, first, null); 539 // At this point the entire loop has been flushed so we can add one of its nodes to the 540 // concurrent walker 541 if (numWalkers > 0) { 542 addFlushed(key.getBytes()); 543 if (walkers.isEmpty()) { 544 startWalkers(numWalkers, conf, output); 545 } 546 } 547 first = null; 548 prev = null; 549 } 550 } 551 } 552 553 private static <T> void circularLeftShift(T[] first) { 554 T ez = first[0]; 555 System.arraycopy(first, 1, first, 0, first.length - 1); 556 first[first.length - 1] = ez; 557 } 558 559 private void addFlushed(byte[] rowKey) { 560 synchronized (flushedLoopsLock) { 561 flushedLoops.add(Bytes.toLong(rowKey)); 562 flushedLoopsLock.notifyAll(); 563 } 564 } 565 566 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 567 throws IOException { 568 for (int i = 0; i < current.length; i++) { 569 570 if (i % 100 == 0) { 571 // Tickle progress every so often else maprunner will think us hung 572 output.progress(); 573 } 574 575 Put put = new Put(current[i]); 576 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 577 578 if (count >= 0) { 579 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 580 } 581 if (id != null) { 582 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 583 } 584 // See if we are to write multiple columns. 585 if (this.multipleUnevenColumnFamilies) { 586 // Use any column name. 587 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 588 // Use any column name. 589 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 590 } 591 mutator.mutate(put); 592 } 593 594 mutator.flush(); 595 } 596 597 private void startWalkers(int numWalkers, Configuration conf, Context context) { 598 LOG.info("Starting " + numWalkers + " concurrent walkers"); 599 for (int i = 0; i < numWalkers; i++) { 600 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 601 walker.start(); 602 walkers.add(walker); 603 } 604 } 605 606 private void joinWalkers() { 607 synchronized (flushedLoopsLock) { 608 walkersStop = true; 609 flushedLoopsLock.notifyAll(); 610 } 611 for (Thread walker : walkers) { 612 Uninterruptibles.joinUninterruptibly(walker); 613 } 614 } 615 616 /** 617 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 618 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 619 * configured to only log erroneous nodes. 620 */ 621 622 public class ContinuousConcurrentWalker implements Runnable { 623 624 ConcurrentWalker walker; 625 Configuration conf; 626 Context context; 627 628 public ContinuousConcurrentWalker(Configuration conf, Context context) { 629 this.conf = conf; 630 this.context = context; 631 } 632 633 @Override 634 public void run() { 635 while (!walkersStop) { 636 try { 637 long node = selectLoop(); 638 try { 639 walkLoop(node); 640 } catch (IOException e) { 641 context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l); 642 return; 643 } 644 } catch (InterruptedException e) { 645 return; 646 } 647 } 648 } 649 650 private void walkLoop(long node) throws IOException { 651 walker = new ConcurrentWalker(context); 652 walker.setConf(conf); 653 walker.run(node, wrap); 654 } 655 656 private long selectLoop() throws InterruptedException { 657 synchronized (flushedLoopsLock) { 658 while (flushedLoops.isEmpty() && !walkersStop) { 659 flushedLoopsLock.wait(); 660 } 661 if (walkersStop) { 662 throw new InterruptedException(); 663 } 664 return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size())); 665 } 666 } 667 } 668 669 public static class ConcurrentWalker extends WalkerBase { 670 671 Context context; 672 673 public ConcurrentWalker(Context context) { 674 this.context = context; 675 } 676 677 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 678 679 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 680 byte[] startKey = Bytes.toBytes(startKeyIn); 681 682 Connection connection = ConnectionFactory.createConnection(getConf()); 683 Table table = connection.getTable(getTableName(getConf())); 684 long numQueries = 0; 685 // If isSpecificStart is set, only walk one list from that particular node. 686 // Note that in case of circular (or P-shaped) list it will walk forever, as is 687 // the case in normal run without startKey. 688 689 CINode node = findStartNode(table, startKey); 690 if (node == null) { 691 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 692 throw new IOException("Start node not found: " + startKeyIn); 693 } 694 while (numQueries < maxQueries) { 695 numQueries++; 696 byte[] prev = node.prev; 697 node = getNode(prev, table, node); 698 if (node == null) { 699 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 700 context.getCounter(GeneratorCounts.UNDEFINED).increment(1l); 701 } else if (node.prev.length == NO_KEY.length) { 702 LOG.error( 703 "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key)); 704 context.getCounter(GeneratorCounts.TERMINATING).increment(1l); 705 } else { 706 // Increment for successful walk 707 context.getCounter(GeneratorCounts.SUCCESS).increment(1l); 708 } 709 } 710 table.close(); 711 connection.close(); 712 } 713 } 714 } 715 716 @Override 717 public int run(String[] args) throws Exception { 718 if (args.length < 3) { 719 System.err.println(USAGE); 720 return 1; 721 } 722 try { 723 int numMappers = Integer.parseInt(args[0]); 724 long numNodes = Long.parseLong(args[1]); 725 Path tmpOutput = new Path(args[2]); 726 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 727 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 728 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 729 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 730 } catch (NumberFormatException e) { 731 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 732 System.err.println(USAGE); 733 return 1; 734 } 735 } 736 737 protected void createSchema() throws IOException { 738 Configuration conf = getConf(); 739 TableName tableName = getTableName(conf); 740 try (Connection conn = ConnectionFactory.createConnection(conf); 741 Admin admin = conn.getAdmin()) { 742 if (!admin.tableExists(tableName)) { 743 TableDescriptor tableDescriptor = TableDescriptorBuilder 744 .newBuilder(getTableName(getConf())) 745 // if -DuseMob=true force all data through mob path. 746 .setColumnFamily( 747 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)).build()) 748 // Always add these families. Just skip writing to them when we do not test per CF 749 // flush. 750 .setColumnFamily( 751 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(BIG_FAMILY_NAME)) 752 .build()) 753 .setColumnFamily( 754 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(TINY_FAMILY_NAME)) 755 .build()) 756 .build(); 757 758 // If we want to pre-split compute how many splits. 759 if (conf.getBoolean(PRESPLIT_TEST_TABLE_KEY, PRESPLIT_TEST_TABLE)) { 760 int numberOfServers = admin.getRegionServers().size(); 761 if (numberOfServers == 0) { 762 throw new IllegalStateException("No live regionservers"); 763 } 764 int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER); 765 int totalNumberOfRegions = numberOfServers * regionsPerServer; 766 LOG.info("Number of live regionservers: " + numberOfServers + ", " 767 + "pre-splitting table into " + totalNumberOfRegions + " regions " 768 + "(default regions per server: " + regionsPerServer + ")"); 769 770 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 771 772 admin.createTable(tableDescriptor, splits); 773 } else { 774 // Looks like we're just letting things play out. 775 // Create a table with on region by default. 776 // This will make the splitting work hard. 777 admin.createTable(tableDescriptor); 778 } 779 } 780 } catch (MasterNotRunningException e) { 781 LOG.error("Master not running", e); 782 throw new IOException(e); 783 } 784 } 785 786 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 787 Integer wrapMultiplier, Integer numWalkers) throws Exception { 788 LOG.info( 789 "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); 790 Job job = Job.getInstance(getConf()); 791 792 job.setJobName("Random Input Generator"); 793 job.setNumReduceTasks(0); 794 job.setJarByClass(getClass()); 795 796 job.setInputFormatClass(GeneratorInputFormat.class); 797 job.setOutputKeyClass(BytesWritable.class); 798 job.setOutputValueClass(NullWritable.class); 799 800 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 801 802 job.setMapperClass(Mapper.class); // identity mapper 803 804 FileOutputFormat.setOutputPath(job, tmpOutput); 805 job.setOutputFormatClass(SequenceFileOutputFormat.class); 806 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class); 807 808 boolean success = jobCompletion(job); 809 810 return success ? 0 : 1; 811 } 812 813 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 814 Integer wrapMultiplier, Integer numWalkers) throws Exception { 815 LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes); 816 createSchema(); 817 job = Job.getInstance(getConf()); 818 819 job.setJobName("Link Generator"); 820 job.setNumReduceTasks(0); 821 job.setJarByClass(getClass()); 822 823 FileInputFormat.setInputPaths(job, tmpOutput); 824 job.setInputFormatClass(OneFilePerMapperSFIF.class); 825 job.setOutputKeyClass(NullWritable.class); 826 job.setOutputValueClass(NullWritable.class); 827 828 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 829 830 setMapperForGenerator(job); 831 832 job.setOutputFormatClass(NullOutputFormat.class); 833 834 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 835 TableMapReduceUtil.addDependencyJars(job); 836 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 837 AbstractHBaseTool.class); 838 TableMapReduceUtil.initCredentials(job); 839 840 boolean success = jobCompletion(job); 841 842 return success ? 0 : 1; 843 } 844 845 protected boolean jobCompletion(Job job) 846 throws IOException, InterruptedException, ClassNotFoundException { 847 boolean success = job.waitForCompletion(true); 848 return success; 849 } 850 851 protected void setMapperForGenerator(Job job) { 852 job.setMapperClass(GeneratorMapper.class); 853 } 854 855 public int run(int numMappers, long numNodes, Path tmpOutput, Integer width, 856 Integer wrapMultiplier, Integer numWalkers) throws Exception { 857 int ret = 858 runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 859 if (ret > 0) { 860 return ret; 861 } 862 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 863 } 864 865 public boolean verify() { 866 try { 867 Counters counters = job.getCounters(); 868 if (counters == null) { 869 LOG.info("Counters object was null, Generator verification cannot be performed." 870 + " This is commonly a result of insufficient YARN configuration."); 871 return false; 872 } 873 874 if ( 875 counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0 876 || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0 877 || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0 878 ) { 879 LOG.error("Concurrent walker failed to verify during Generation phase"); 880 LOG.error( 881 "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue()); 882 LOG.error( 883 "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue()); 884 LOG.error( 885 "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue()); 886 return false; 887 } 888 } catch (IOException e) { 889 LOG.info("Generator verification could not find counter"); 890 return false; 891 } 892 return true; 893 } 894 } 895 896 private static ColumnFamilyDescriptorBuilder setMobProperties(final Configuration conf, 897 final ColumnFamilyDescriptorBuilder builder) { 898 if (conf.getBoolean("useMob", false)) { 899 builder.setMobEnabled(true); 900 builder.setMobThreshold(4); 901 } 902 return builder; 903 } 904 905 /** 906 * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key 907 * file must have been written by Verify step (we depend on the format it writes out. We'll read 908 * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO). 909 */ 910 static class Search extends Configured implements Tool { 911 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 912 protected Job job; 913 914 private static void printUsage(final String error) { 915 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 916 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 917 } 918 919 @Override 920 public int run(String[] args) throws Exception { 921 if (args.length < 1 || args.length > 2) { 922 printUsage(null); 923 return 1; 924 } 925 Path inputDir = new Path(args[0]); 926 int numMappers = 1; 927 if (args.length > 1) { 928 numMappers = Integer.parseInt(args[1]); 929 } 930 return run(inputDir, numMappers); 931 } 932 933 /** 934 * WALPlayer override that searches for keys loaded in the setup. 935 */ 936 public static class WALSearcher extends WALPlayer { 937 public WALSearcher(Configuration conf) { 938 super(conf); 939 } 940 941 /** 942 * The actual searcher mapper. 943 */ 944 public static class WALMapperSearcher extends WALMapper { 945 private SortedSet<byte[]> keysToFind; 946 private AtomicInteger rows = new AtomicInteger(0); 947 948 @Override 949 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 950 throws IOException { 951 super.setup(context); 952 try { 953 this.keysToFind = readKeysToSearch(context.getConfiguration()); 954 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 955 } catch (InterruptedException e) { 956 throw new InterruptedIOException(e.toString()); 957 } 958 } 959 960 @Override 961 protected boolean filter(Context context, Cell cell) { 962 // TODO: Can I do a better compare than this copying out key? 963 byte[] row = new byte[cell.getRowLength()]; 964 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 965 boolean b = this.keysToFind.contains(row); 966 if (b) { 967 String keyStr = Bytes.toStringBinary(row); 968 try { 969 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 970 } catch (IOException | InterruptedException e) { 971 LOG.warn(e.toString(), e); 972 } 973 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 974 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 975 } 976 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 977 } 978 return b; 979 } 980 } 981 982 // Put in place the above WALMapperSearcher. 983 @Override 984 public Job createSubmittableJob(String[] args) throws IOException { 985 Job job = super.createSubmittableJob(args); 986 // Call my class instead. 987 job.setJarByClass(WALMapperSearcher.class); 988 job.setMapperClass(WALMapperSearcher.class); 989 job.setOutputFormatClass(NullOutputFormat.class); 990 return job; 991 } 992 } 993 994 static final String FOUND_GROUP_KEY = "Found"; 995 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 996 997 public int run(Path inputDir, int numMappers) throws Exception { 998 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 999 SortedSet<byte[]> keys = readKeysToSearch(getConf()); 1000 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 1001 LOG.info("Count of keys to find: " + keys.size()); 1002 for (byte[] key : keys) 1003 LOG.info("Key: " + Bytes.toStringBinary(key)); 1004 // Now read all WALs. In two dirs. Presumes certain layout. 1005 Path walsDir = 1006 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 1007 Path oldWalsDir = 1008 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 1009 LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers 1010 + " against " + getConf().get(HConstants.HBASE_DIR)); 1011 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 1012 new String[] { walsDir.toString(), "" }); 1013 if (ret != 0) { 1014 return ret; 1015 } 1016 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 1017 new String[] { oldWalsDir.toString(), "" }); 1018 } 1019 1020 static SortedSet<byte[]> readKeysToSearch(final Configuration conf) 1021 throws IOException, InterruptedException { 1022 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 1023 FileSystem fs = FileSystem.get(conf); 1024 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1025 if (!fs.exists(keysInputDir)) { 1026 throw new FileNotFoundException(keysInputDir.toString()); 1027 } 1028 if (!fs.isDirectory(keysInputDir)) { 1029 throw new UnsupportedOperationException("TODO"); 1030 } else { 1031 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 1032 while (iterator.hasNext()) { 1033 LocatedFileStatus keyFileStatus = iterator.next(); 1034 // Skip "_SUCCESS" file. 1035 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 1036 result.addAll(readFileToSearch(conf, keyFileStatus)); 1037 } 1038 } 1039 return result; 1040 } 1041 1042 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 1043 final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { 1044 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1045 // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This 1046 // is 1047 // what is missing. 1048 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 1049 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 1050 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 1051 InputSplit is = 1052 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {}); 1053 rr.initialize(is, context); 1054 while (rr.nextKeyValue()) { 1055 rr.getCurrentKey(); 1056 BytesWritable bw = rr.getCurrentValue(); 1057 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) { 1058 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1059 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, 1060 rr.getCurrentKey().getLength()); 1061 result.add(key); 1062 } 1063 } 1064 } 1065 return result; 1066 } 1067 } 1068 1069 /** 1070 * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have 1071 * any holes. 1072 */ 1073 static class Verify extends Configured implements Tool { 1074 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1075 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1076 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1077 protected Job job; 1078 1079 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1080 private BytesWritable row = new BytesWritable(); 1081 private BytesWritable ref = new BytesWritable(); 1082 private boolean multipleUnevenColumnFamilies; 1083 1084 @Override 1085 protected void 1086 setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1087 throws IOException, InterruptedException { 1088 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1089 } 1090 1091 @Override 1092 protected void map(ImmutableBytesWritable key, Result value, Context context) 1093 throws IOException, InterruptedException { 1094 byte[] rowKey = key.get(); 1095 row.set(rowKey, 0, rowKey.length); 1096 if ( 1097 multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) 1098 || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME)) 1099 ) { 1100 context.write(row, DEF_LOST_FAMILIES); 1101 } else { 1102 context.write(row, DEF); 1103 } 1104 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1105 if (prev != null && prev.length > 0) { 1106 ref.set(prev, 0, prev.length); 1107 context.write(ref, row); 1108 } else { 1109 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1110 } 1111 } 1112 } 1113 1114 /** 1115 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1116 * problems found from the reducer. 1117 */ 1118 public static enum VerifyCounts { 1119 UNREFERENCED, 1120 UNDEFINED, 1121 REFERENCED, 1122 CORRUPT, 1123 EXTRAREFERENCES, 1124 EXTRA_UNDEF_REFERENCES, 1125 LOST_FAMILIES 1126 } 1127 1128 /** 1129 * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent 1130 * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what 1131 * sort of emission it is. Flag is the Count enum ordinal as a short. 1132 */ 1133 public static class VerifyReducer 1134 extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1135 private ArrayList<byte[]> refs = new ArrayList<>(); 1136 private final BytesWritable UNREF = 1137 new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {})); 1138 private final BytesWritable LOSTFAM = 1139 new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {})); 1140 1141 private AtomicInteger rows = new AtomicInteger(0); 1142 private Connection connection; 1143 1144 @Override 1145 protected void 1146 setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1147 throws IOException, InterruptedException { 1148 super.setup(context); 1149 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1150 } 1151 1152 @Override 1153 protected void 1154 cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1155 throws IOException, InterruptedException { 1156 if (this.connection != null) { 1157 this.connection.close(); 1158 } 1159 super.cleanup(context); 1160 } 1161 1162 /** 1163 * Returns new byte array that has <code>ordinal</code> as prefix on front taking up 1164 * Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1165 */ 1166 public static byte[] addPrefixFlag(final int ordinal, final byte[] r) { 1167 byte[] prefix = Bytes.toBytes((short) ordinal); 1168 if (prefix.length != Bytes.SIZEOF_SHORT) { 1169 throw new RuntimeException("Unexpected size: " + prefix.length); 1170 } 1171 byte[] result = new byte[prefix.length + r.length]; 1172 System.arraycopy(prefix, 0, result, 0, prefix.length); 1173 System.arraycopy(r, 0, result, prefix.length, r.length); 1174 return result; 1175 } 1176 1177 /** 1178 * Returns type from the Counts enum of this row. Reads prefix added by 1179 * {@link #addPrefixFlag(int, byte[])} 1180 */ 1181 public static VerifyCounts whichType(final byte[] bs) { 1182 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1183 return VerifyCounts.values()[ordinal]; 1184 } 1185 1186 /** Returns Row bytes minus the type flag. */ 1187 public static byte[] getRowOnly(BytesWritable bw) { 1188 byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT]; 1189 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1190 return bytes; 1191 } 1192 1193 @Override 1194 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1195 throws IOException, InterruptedException { 1196 int defCount = 0; 1197 boolean lostFamilies = false; 1198 refs.clear(); 1199 for (BytesWritable type : values) { 1200 if (type.getLength() == DEF.getLength()) { 1201 defCount++; 1202 if (type.getBytes()[0] == 1) { 1203 lostFamilies = true; 1204 } 1205 } else { 1206 byte[] bytes = new byte[type.getLength()]; 1207 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1208 refs.add(bytes); 1209 } 1210 } 1211 1212 // TODO check for more than one def, should not happen 1213 StringBuilder refsSb = null; 1214 if (defCount == 0 || refs.size() != 1) { 1215 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1216 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1217 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" 1218 + (refsSb != null ? refsSb.toString() : "")); 1219 } 1220 if (lostFamilies) { 1221 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1222 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1223 context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1); 1224 context.write(key, LOSTFAM); 1225 } 1226 1227 if (defCount == 0 && refs.size() > 0) { 1228 // This is bad, found a node that is referenced but not defined. It must have been 1229 // lost, emit some info about this node for debugging purposes. 1230 // Write out a line per reference. If more than one, flag it.; 1231 for (int i = 0; i < refs.size(); i++) { 1232 byte[] bs = refs.get(i); 1233 int ordinal; 1234 if (i <= 0) { 1235 ordinal = VerifyCounts.UNDEFINED.ordinal(); 1236 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1237 context.getCounter(VerifyCounts.UNDEFINED).increment(1); 1238 } else { 1239 ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal(); 1240 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1241 } 1242 } 1243 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1244 // Print out missing row; doing get on reference gives info on when the referencer 1245 // was added which can help a little debugging. This info is only available in mapper 1246 // output -- the 'Linked List error Key...' log message above. What we emit here is 1247 // useless for debugging. 1248 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1249 context.getCounter("undef", keyString).increment(1); 1250 } 1251 } else if (defCount > 0 && refs.isEmpty()) { 1252 // node is defined but not referenced 1253 context.write(key, UNREF); 1254 context.getCounter(VerifyCounts.UNREFERENCED).increment(1); 1255 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1256 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1257 context.getCounter("unref", keyString).increment(1); 1258 } 1259 } else { 1260 if (refs.size() > 1) { 1261 // Skip first reference. 1262 for (int i = 1; i < refs.size(); i++) { 1263 context.write(key, new BytesWritable( 1264 addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1265 } 1266 context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1); 1267 } 1268 // node is defined and referenced 1269 context.getCounter(VerifyCounts.REFERENCED).increment(1); 1270 } 1271 } 1272 1273 /** 1274 * Dump out extra info around references if there are any. Helps debugging. 1275 * @return StringBuilder filled with references if any. 1276 */ 1277 @SuppressWarnings("JavaUtilDate") 1278 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1279 final List<byte[]> refs) throws IOException { 1280 StringBuilder refsSb = null; 1281 if (refs.isEmpty()) return refsSb; 1282 refsSb = new StringBuilder(); 1283 String comma = ""; 1284 // If a row is a reference but has no define, print the content of the row that has 1285 // this row as a 'prev'; it will help debug. The missing row was written just before 1286 // the row we are dumping out here. 1287 TableName tn = getTableName(context.getConfiguration()); 1288 try (Table t = this.connection.getTable(tn)) { 1289 for (byte[] ref : refs) { 1290 Result r = t.get(new Get(ref)); 1291 List<Cell> cells = r.listCells(); 1292 String ts = (cells != null && !cells.isEmpty()) 1293 ? new java.util.Date(cells.get(0).getTimestamp()).toString() 1294 : ""; 1295 byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1296 String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : ""; 1297 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1298 long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1; 1299 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1300 String refRegionLocation = ""; 1301 String keyRegionLocation = ""; 1302 if (b != null && b.length > 0) { 1303 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1304 HRegionLocation hrl = rl.getRegionLocation(b); 1305 if (hrl != null) refRegionLocation = hrl.toString(); 1306 // Key here probably has trailing zeros on it. 1307 hrl = rl.getRegionLocation(key.getBytes()); 1308 if (hrl != null) keyRegionLocation = hrl.toString(); 1309 } 1310 } 1311 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) 1312 + ", refPrevEqualsKey=" 1313 + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) 1314 + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) 1315 + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count 1316 + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation=" 1317 + keyRegionLocation); 1318 refsSb.append(comma); 1319 comma = ","; 1320 refsSb.append(Bytes.toStringBinary(ref)); 1321 } 1322 } 1323 return refsSb; 1324 } 1325 } 1326 1327 @Override 1328 public int run(String[] args) throws Exception { 1329 if (args.length != 2) { 1330 System.out 1331 .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>"); 1332 return 0; 1333 } 1334 1335 String outputDir = args[0]; 1336 int numReducers = Integer.parseInt(args[1]); 1337 1338 return run(outputDir, numReducers); 1339 } 1340 1341 public int run(String outputDir, int numReducers) throws Exception { 1342 return run(new Path(outputDir), numReducers); 1343 } 1344 1345 public int run(Path outputDir, int numReducers) throws Exception { 1346 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 1347 1348 job = Job.getInstance(getConf()); 1349 1350 job.setJobName("Link Verifier"); 1351 job.setNumReduceTasks(numReducers); 1352 job.setJarByClass(getClass()); 1353 1354 setJobScannerConf(job); 1355 1356 Scan scan = new Scan(); 1357 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1358 scan.setCaching(10000); 1359 scan.setCacheBlocks(false); 1360 if (isMultiUnevenColumnFamilies(getConf())) { 1361 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1362 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1363 } 1364 1365 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1366 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1367 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1368 AbstractHBaseTool.class); 1369 1370 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1371 1372 job.setReducerClass(VerifyReducer.class); 1373 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1374 job.setOutputKeyClass(BytesWritable.class); 1375 job.setOutputValueClass(BytesWritable.class); 1376 TextOutputFormat.setOutputPath(job, outputDir); 1377 1378 boolean success = job.waitForCompletion(true); 1379 1380 if (success) { 1381 Counters counters = job.getCounters(); 1382 if (null == counters) { 1383 LOG.warn("Counters were null, cannot verify Job completion." 1384 + " This is commonly a result of insufficient YARN configuration."); 1385 // We don't have access to the counters to know if we have "bad" counts 1386 return 0; 1387 } 1388 1389 // If we find no unexpected values, the job didn't outright fail 1390 if (verifyUnexpectedValues(counters)) { 1391 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1392 return 0; 1393 } 1394 } 1395 1396 // We failed 1397 return 1; 1398 } 1399 1400 public boolean verify(long expectedReferenced) throws Exception { 1401 if (job == null) { 1402 throw new IllegalStateException("You should call run() first"); 1403 } 1404 1405 Counters counters = job.getCounters(); 1406 if (counters == null) { 1407 LOG.info("Counters object was null, write verification cannot be performed." 1408 + " This is commonly a result of insufficient YARN configuration."); 1409 return false; 1410 } 1411 1412 // Run through each check, even if we fail one early 1413 boolean success = verifyExpectedValues(expectedReferenced, counters); 1414 1415 if (!verifyUnexpectedValues(counters)) { 1416 // We found counter objects which imply failure 1417 success = false; 1418 } 1419 1420 if (!success) { 1421 handleFailure(counters); 1422 } 1423 return success; 1424 } 1425 1426 /** 1427 * Verify the values in the Counters against the expected number of entries written. Expected 1428 * number of referenced entrires The Job's Counters object 1429 * @return True if the values match what's expected, false otherwise 1430 */ 1431 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1432 final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED); 1433 final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED); 1434 boolean success = true; 1435 1436 if (expectedReferenced != referenced.getValue()) { 1437 LOG.error("Expected referenced count does not match with actual referenced count. " 1438 + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1439 success = false; 1440 } 1441 1442 if (unreferenced.getValue() > 0) { 1443 final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES); 1444 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1445 LOG.error( 1446 "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1447 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1448 success = false; 1449 } 1450 1451 return success; 1452 } 1453 1454 /** 1455 * Verify that the Counters don't contain values which indicate an outright failure from the 1456 * Reducers. The Job's counters 1457 * @return True if the "bad" counter objects are 0, false otherwise 1458 */ 1459 protected boolean verifyUnexpectedValues(Counters counters) { 1460 final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED); 1461 final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES); 1462 boolean success = true; 1463 1464 if (undefined.getValue() > 0) { 1465 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1466 success = false; 1467 } 1468 1469 if (lostfamilies.getValue() > 0) { 1470 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1471 success = false; 1472 } 1473 1474 return success; 1475 } 1476 1477 protected void handleFailure(Counters counters) throws IOException { 1478 Configuration conf = job.getConfiguration(); 1479 TableName tableName = getTableName(conf); 1480 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1481 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1482 CounterGroup g = counters.getGroup("undef"); 1483 Iterator<Counter> it = g.iterator(); 1484 while (it.hasNext()) { 1485 String keyString = it.next().getName(); 1486 byte[] key = Bytes.toBytes(keyString); 1487 HRegionLocation loc = rl.getRegionLocation(key, true); 1488 LOG.error("undefined row " + keyString + ", " + loc); 1489 } 1490 g = counters.getGroup("unref"); 1491 it = g.iterator(); 1492 while (it.hasNext()) { 1493 String keyString = it.next().getName(); 1494 byte[] key = Bytes.toBytes(keyString); 1495 HRegionLocation loc = rl.getRegionLocation(key, true); 1496 LOG.error("unreferred row " + keyString + ", " + loc); 1497 } 1498 } 1499 } 1500 } 1501 } 1502 1503 /** 1504 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1505 * adds more data. 1506 */ 1507 static class Loop extends Configured implements Tool { 1508 1509 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1510 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " 1511 + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" 1512 + " <num walker threads>] \n" 1513 + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" 1514 + "walkers will select and verify random flushed loop during Generation."; 1515 1516 IntegrationTestBigLinkedList it; 1517 1518 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 1519 Integer wrapMultiplier, Integer numWalkers) throws Exception { 1520 Path outputPath = new Path(outputDir); 1521 UUID uuid = UUID.randomUUID(); // create a random UUID. 1522 Path generatorOutput = new Path(outputPath, uuid.toString()); 1523 1524 Generator generator = new Generator(); 1525 generator.setConf(getConf()); 1526 int retCode = 1527 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 1528 if (retCode > 0) { 1529 throw new RuntimeException("Generator failed with return code: " + retCode); 1530 } 1531 if (numWalkers > 0) { 1532 if (!generator.verify()) { 1533 throw new RuntimeException("Generator.verify failed"); 1534 } 1535 } 1536 LOG.info("Generator finished with success. Total nodes=" + numNodes); 1537 } 1538 1539 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes) 1540 throws Exception { 1541 Path outputPath = new Path(outputDir); 1542 UUID uuid = UUID.randomUUID(); // create a random UUID. 1543 Path iterationOutput = new Path(outputPath, uuid.toString()); 1544 1545 Verify verify = new Verify(); 1546 verify.setConf(getConf()); 1547 int retCode = verify.run(iterationOutput, numReducers); 1548 if (retCode > 0) { 1549 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1550 } 1551 1552 if (!verify.verify(expectedNumNodes)) { 1553 throw new RuntimeException("Verify.verify failed"); 1554 } 1555 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1556 } 1557 1558 @Override 1559 public int run(String[] args) throws Exception { 1560 if (args.length < 5) { 1561 System.err.println(USAGE); 1562 return 1; 1563 } 1564 try { 1565 int numIterations = Integer.parseInt(args[0]); 1566 int numMappers = Integer.parseInt(args[1]); 1567 long numNodes = Long.parseLong(args[2]); 1568 String outputDir = args[3]; 1569 int numReducers = Integer.parseInt(args[4]); 1570 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1571 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1572 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1573 1574 long expectedNumNodes = 0; 1575 1576 if (numIterations < 0) { 1577 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 1578 } 1579 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1580 for (int i = 0; i < numIterations; i++) { 1581 LOG.info("Starting iteration = " + i); 1582 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1583 expectedNumNodes += numMappers * numNodes; 1584 runVerify(outputDir, numReducers, expectedNumNodes); 1585 } 1586 return 0; 1587 } catch (NumberFormatException e) { 1588 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1589 System.err.println(USAGE); 1590 return 1; 1591 } 1592 } 1593 } 1594 1595 /** 1596 * A stand alone program that prints out portions of a list created by {@link Generator} 1597 */ 1598 private static class Print extends Configured implements Tool { 1599 @Override 1600 public int run(String[] args) throws Exception { 1601 Options options = new Options(); 1602 options.addOption("s", "start", true, "start key"); 1603 options.addOption("e", "end", true, "end key"); 1604 options.addOption("l", "limit", true, "number to print"); 1605 1606 GnuParser parser = new GnuParser(); 1607 CommandLine cmd = null; 1608 try { 1609 cmd = parser.parse(options, args); 1610 if (cmd.getArgs().length != 0) { 1611 throw new ParseException("Command takes no arguments"); 1612 } 1613 } catch (ParseException e) { 1614 System.err.println("Failed to parse command line " + e.getMessage()); 1615 System.err.println(); 1616 HelpFormatter formatter = new HelpFormatter(); 1617 formatter.printHelp(getClass().getSimpleName(), options); 1618 System.exit(-1); 1619 } 1620 1621 Connection connection = ConnectionFactory.createConnection(getConf()); 1622 Table table = connection.getTable(getTableName(getConf())); 1623 1624 Scan scan = new Scan(); 1625 scan.setBatch(10000); 1626 1627 if (cmd.hasOption("s")) scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1628 1629 if (cmd.hasOption("e")) { 1630 scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1631 } 1632 1633 int limit = 0; 1634 if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l")); 1635 else limit = 100; 1636 1637 ResultScanner scanner = table.getScanner(scan); 1638 1639 CINode node = new CINode(); 1640 Result result = scanner.next(); 1641 int count = 0; 1642 while (result != null && count++ < limit) { 1643 node = getCINode(result, node); 1644 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1645 Bytes.toStringBinary(node.prev), node.count, node.client); 1646 result = scanner.next(); 1647 } 1648 scanner.close(); 1649 table.close(); 1650 connection.close(); 1651 1652 return 0; 1653 } 1654 } 1655 1656 /** 1657 * A stand alone program that deletes a single node. 1658 */ 1659 private static class Delete extends Configured implements Tool { 1660 @Override 1661 public int run(String[] args) throws Exception { 1662 if (args.length != 1) { 1663 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1664 return 0; 1665 } 1666 byte[] val = Bytes.toBytesBinary(args[0]); 1667 1668 org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val); 1669 1670 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1671 Table table = connection.getTable(getTableName(getConf()))) { 1672 table.delete(delete); 1673 } 1674 1675 System.out.println("Delete successful"); 1676 return 0; 1677 } 1678 } 1679 1680 abstract static class WalkerBase extends Configured { 1681 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1682 Scan scan = new Scan(); 1683 scan.withStartRow(startKey); 1684 scan.setBatch(1); 1685 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1686 1687 long t1 = EnvironmentEdgeManager.currentTime(); 1688 ResultScanner scanner = table.getScanner(scan); 1689 Result result = scanner.next(); 1690 long t2 = EnvironmentEdgeManager.currentTime(); 1691 scanner.close(); 1692 1693 if (result != null) { 1694 CINode node = getCINode(result, new CINode()); 1695 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1696 return node; 1697 } 1698 1699 System.out.println("FSR " + (t2 - t1)); 1700 1701 return null; 1702 } 1703 1704 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1705 Get get = new Get(row); 1706 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1707 Result result = table.get(get); 1708 return getCINode(result, node); 1709 } 1710 } 1711 1712 /** 1713 * A stand alone program that follows a linked list created by {@link Generator} and prints timing 1714 * info. 1715 */ 1716 private static class Walker extends WalkerBase implements Tool { 1717 1718 public Walker() { 1719 } 1720 1721 @Override 1722 public int run(String[] args) throws IOException { 1723 1724 Options options = new Options(); 1725 options.addOption("n", "num", true, "number of queries"); 1726 options.addOption("s", "start", true, "key to start at, binary string"); 1727 options.addOption("l", "logevery", true, "log every N queries"); 1728 1729 GnuParser parser = new GnuParser(); 1730 CommandLine cmd = null; 1731 try { 1732 cmd = parser.parse(options, args); 1733 if (cmd.getArgs().length != 0) { 1734 throw new ParseException("Command takes no arguments"); 1735 } 1736 } catch (ParseException e) { 1737 System.err.println("Failed to parse command line " + e.getMessage()); 1738 System.err.println(); 1739 HelpFormatter formatter = new HelpFormatter(); 1740 formatter.printHelp(getClass().getSimpleName(), options); 1741 System.exit(-1); 1742 } 1743 1744 long maxQueries = Long.MAX_VALUE; 1745 if (cmd.hasOption('n')) { 1746 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1747 } 1748 boolean isSpecificStart = cmd.hasOption('s'); 1749 1750 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1751 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1752 1753 Connection connection = ConnectionFactory.createConnection(getConf()); 1754 Table table = connection.getTable(getTableName(getConf())); 1755 long numQueries = 0; 1756 // If isSpecificStart is set, only walk one list from that particular node. 1757 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1758 // the case in normal run without startKey. 1759 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1760 if (!isSpecificStart) { 1761 startKey = new byte[ROWKEY_LENGTH]; 1762 Bytes.random(startKey); 1763 } 1764 CINode node = findStartNode(table, startKey); 1765 if (node == null && isSpecificStart) { 1766 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1767 } 1768 numQueries++; 1769 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) { 1770 byte[] prev = node.prev; 1771 long t1 = EnvironmentEdgeManager.currentTime(); 1772 node = getNode(prev, table, node); 1773 long t2 = EnvironmentEdgeManager.currentTime(); 1774 if (logEvery > 0 && numQueries % logEvery == 0) { 1775 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1776 } 1777 numQueries++; 1778 if (node == null) { 1779 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1780 } else if (node.prev.length == NO_KEY.length) { 1781 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1782 } 1783 } 1784 } 1785 table.close(); 1786 connection.close(); 1787 return 0; 1788 } 1789 } 1790 1791 private static class Clean extends Configured implements Tool { 1792 @Override 1793 public int run(String[] args) throws Exception { 1794 if (args.length < 1) { 1795 System.err.println("Usage: Clean <output dir>"); 1796 return -1; 1797 } 1798 1799 Path p = new Path(args[0]); 1800 Configuration conf = getConf(); 1801 TableName tableName = getTableName(conf); 1802 try (FileSystem fs = HFileSystem.get(conf); 1803 Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { 1804 if (admin.tableExists(tableName)) { 1805 admin.disableTable(tableName); 1806 admin.deleteTable(tableName); 1807 } 1808 1809 if (fs.exists(p)) { 1810 fs.delete(p, true); 1811 } 1812 } 1813 1814 return 0; 1815 } 1816 } 1817 1818 static TableName getTableName(Configuration conf) { 1819 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1820 } 1821 1822 private static CINode getCINode(Result result, CINode node) { 1823 node.key = Bytes.copy(result.getRow()); 1824 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1825 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1826 } else { 1827 node.prev = NO_KEY; 1828 } 1829 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1830 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1831 } else { 1832 node.count = -1; 1833 } 1834 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1835 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1836 } else { 1837 node.client = ""; 1838 } 1839 return node; 1840 } 1841 1842 @Override 1843 public void setUpCluster() throws Exception { 1844 util = getTestingUtil(getConf()); 1845 boolean isDistributed = util.isDistributedCluster(); 1846 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1847 if (!isDistributed) { 1848 util.startMiniMapReduceCluster(); 1849 } 1850 this.setConf(util.getConfiguration()); 1851 } 1852 1853 @Override 1854 public void cleanUpCluster() throws Exception { 1855 super.cleanUpCluster(); 1856 if (util.isDistributedCluster()) { 1857 util.shutdownMiniMapReduceCluster(); 1858 } 1859 } 1860 1861 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1862 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true); 1863 } 1864 1865 @Test 1866 public void testContinuousIngest() throws IOException, Exception { 1867 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1868 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1869 if (isMultiUnevenColumnFamilies(getConf())) { 1870 // make sure per CF flush is on 1871 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 1872 FlushAllLargeStoresPolicy.class.getName()); 1873 } 1874 int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1875 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1876 assertEquals(0, ret); 1877 } 1878 1879 private void usage() { 1880 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1881 printCommands(); 1882 } 1883 1884 private void printCommands() { 1885 System.err.println("Commands:"); 1886 System.err.println(" generator Map only job that generates data."); 1887 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1888 System.err.println(" look at the counts after running. See REFERENCED and"); 1889 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1890 System.err.println(" with the Generator."); 1891 System.err.println(" walker " 1892 + "Standalone program that starts following a linked list & emits timing info."); 1893 System.err.println(" print Standalone program that prints nodes in the linked list."); 1894 System.err.println(" delete Standalone program that deletes a single node."); 1895 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1896 System.err.println(" clean Program to clean all left over detritus."); 1897 System.err.println(" search Search for missing keys."); 1898 System.err.println(""); 1899 System.err.println("General options:"); 1900 System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>"); 1901 System.err.println( 1902 " Run using the <tableName> as the tablename. Defaults to " + DEFAULT_TABLE_NAME); 1903 System.err.println(" -D" + REGIONS_PER_SERVER_KEY + "=<# regions>"); 1904 System.err.println(" Create table with presplit regions per server. Defaults to " 1905 + DEFAULT_REGIONS_PER_SERVER); 1906 1907 System.err.println(" -DuseMob=<true|false>"); 1908 System.err.println( 1909 " Create table so that the mob read/write path is forced. " + "Defaults to false"); 1910 1911 System.err.flush(); 1912 } 1913 1914 @Override 1915 protected void processOptions(CommandLine cmd) { 1916 super.processOptions(cmd); 1917 String[] args = cmd.getArgs(); 1918 // get the class, run with the conf 1919 if (args.length < 1) { 1920 printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]", 1921 "General options:", ""); 1922 printCommands(); 1923 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1924 throw new RuntimeException("Incorrect Number of args."); 1925 } 1926 toRun = args[0]; 1927 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1928 } 1929 1930 @Override 1931 public int runTestFromCommandLine() throws Exception { 1932 Tool tool = null; 1933 if (toRun.equalsIgnoreCase("Generator")) { 1934 tool = new Generator(); 1935 } else if (toRun.equalsIgnoreCase("Verify")) { 1936 tool = new Verify(); 1937 } else if (toRun.equalsIgnoreCase("Loop")) { 1938 Loop loop = new Loop(); 1939 loop.it = this; 1940 tool = loop; 1941 } else if (toRun.equalsIgnoreCase("Walker")) { 1942 tool = new Walker(); 1943 } else if (toRun.equalsIgnoreCase("Print")) { 1944 tool = new Print(); 1945 } else if (toRun.equalsIgnoreCase("Delete")) { 1946 tool = new Delete(); 1947 } else if (toRun.equalsIgnoreCase("Clean")) { 1948 tool = new Clean(); 1949 } else if (toRun.equalsIgnoreCase("Search")) { 1950 tool = new Search(); 1951 } else { 1952 usage(); 1953 throw new RuntimeException("Unknown arg"); 1954 } 1955 1956 return ToolRunner.run(getConf(), tool, otherArgs); 1957 } 1958 1959 @Override 1960 public TableName getTablename() { 1961 Configuration c = getConf(); 1962 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1963 } 1964 1965 @Override 1966 protected Set<String> getColumnFamilies() { 1967 if (isMultiUnevenColumnFamilies(getConf())) { 1968 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1969 Bytes.toString(TINY_FAMILY_NAME)); 1970 } else { 1971 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1972 } 1973 } 1974 1975 private static void setJobConf(Job job, int numMappers, long numNodes, Integer width, 1976 Integer wrapMultiplier, Integer numWalkers) { 1977 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1978 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1979 if (width != null) { 1980 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 1981 } 1982 if (wrapMultiplier != null) { 1983 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 1984 } 1985 if (numWalkers != null) { 1986 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 1987 } 1988 } 1989 1990 public static void setJobScannerConf(Job job) { 1991 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 1992 } 1993 1994 public static void main(String[] args) throws Exception { 1995 Configuration conf = HBaseConfiguration.create(); 1996 IntegrationTestingUtility.setUseDistributedCluster(conf); 1997 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 1998 System.exit(ret); 1999 } 2000}