001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Random; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.commons.lang3.RandomStringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.IntegrationTestBase; 041import org.apache.hadoop.hbase.IntegrationTestingUtility; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Consistency; 048import org.apache.hadoop.hbase.client.RegionLocator; 049import org.apache.hadoop.hbase.client.Result; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.coprocessor.ObserverContext; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 057import org.apache.hadoop.hbase.coprocessor.RegionObserver; 058import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 059import org.apache.hadoop.hbase.regionserver.InternalScanner; 060import org.apache.hadoop.hbase.testclassification.IntegrationTests; 061import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.util.RegionSplitter; 065import org.apache.hadoop.io.LongWritable; 066import org.apache.hadoop.io.NullWritable; 067import org.apache.hadoop.io.Writable; 068import org.apache.hadoop.io.WritableComparable; 069import org.apache.hadoop.io.WritableComparator; 070import org.apache.hadoop.io.WritableUtils; 071import org.apache.hadoop.mapreduce.InputFormat; 072import org.apache.hadoop.mapreduce.InputSplit; 073import org.apache.hadoop.mapreduce.Job; 074import org.apache.hadoop.mapreduce.JobContext; 075import org.apache.hadoop.mapreduce.Mapper; 076import org.apache.hadoop.mapreduce.Partitioner; 077import org.apache.hadoop.mapreduce.RecordReader; 078import org.apache.hadoop.mapreduce.Reducer; 079import org.apache.hadoop.mapreduce.TaskAttemptContext; 080import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 081import org.apache.hadoop.util.StringUtils; 082import org.apache.hadoop.util.ToolRunner; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 088import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 089import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 090 091/** 092 * Test Bulk Load and MR on a distributed cluster. 093 * It starts an MR job that creates linked chains 094 * 095 * The format of rows is like this: 096 * Row Key -> Long 097 * 098 * L:<< Chain Id >> -> Row Key of the next link in the chain 099 * S:<< Chain Id >> -> The step in the chain that his link is. 100 * D:<< Chain Id >> -> Random Data. 101 * 102 * All chains start on row 0. 103 * All rk's are > 0. 104 * 105 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job. 106 * 107 * There are a few options exposed: 108 * 109 * hbase.IntegrationTestBulkLoad.chainLength 110 * The number of rows that will be part of each and every chain. 111 * 112 * hbase.IntegrationTestBulkLoad.numMaps 113 * The number of mappers that will be run. Each mapper creates on linked list chain. 114 * 115 * hbase.IntegrationTestBulkLoad.numImportRounds 116 * How many jobs will be run to create linked lists. 117 * 118 * hbase.IntegrationTestBulkLoad.tableName 119 * The name of the table. 120 * 121 * hbase.IntegrationTestBulkLoad.replicaCount 122 * How many region replicas to configure for the table under test. 123 */ 124@Category(IntegrationTests.class) 125public class IntegrationTestBulkLoad extends IntegrationTestBase { 126 127 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class); 128 129 private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); 130 private static final byte[] SORT_FAM = Bytes.toBytes("S"); 131 private static final byte[] DATA_FAM = Bytes.toBytes("D"); 132 133 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; 134 private static int CHAIN_LENGTH = 500000; 135 136 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 137 private static int NUM_MAPS = 1; 138 139 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 140 private static int NUM_IMPORT_ROUNDS = 1; 141 142 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; 143 144 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; 145 private static String TABLE_NAME = "IntegrationTestBulkLoad"; 146 147 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 148 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 149 150 private static final String OPT_LOAD = "load"; 151 private static final String OPT_CHECK = "check"; 152 153 private boolean load = false; 154 private boolean check = false; 155 156 public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver { 157 static final AtomicLong sleepTime = new AtomicLong(2000); 158 Random r = new Random(); 159 AtomicLong countOfNext = new AtomicLong(0); 160 AtomicLong countOfOpen = new AtomicLong(0); 161 public SlowMeCoproScanOperations() {} 162 163 @Override 164 public Optional<RegionObserver> getRegionObserver() { 165 return Optional.of(this); 166 } 167 168 @Override 169 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 170 final Scan scan) throws IOException { 171 if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly 172 slowdownCode(e); 173 } 174 } 175 176 @Override 177 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 178 final InternalScanner s, final List<Result> results, 179 final int limit, final boolean hasMore) throws IOException { 180 //this will slow down a certain next operation if the conditions are met. The slowness 181 //will allow the call to go to a replica 182 countOfNext.incrementAndGet(); 183 if (countOfNext.get() == 0 || countOfNext.get() == 4) { 184 slowdownCode(e); 185 } 186 return true; 187 } 188 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 189 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 190 try { 191 if (sleepTime.get() > 0) { 192 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 193 Thread.sleep(sleepTime.get()); 194 } 195 } catch (InterruptedException e1) { 196 LOG.error(e1.toString(), e1); 197 } 198 } 199 } 200 } 201 202 /** 203 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. 204 */ 205 private void installSlowingCoproc() throws IOException, InterruptedException { 206 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 207 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 208 209 TableName t = getTablename(); 210 Admin admin = util.getAdmin(); 211 TableDescriptor desc = admin.getDescriptor(t); 212 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc); 213 builder.setCoprocessor(SlowMeCoproScanOperations.class.getName()); 214 HBaseTestingUtility.modifyTableSync(admin, builder.build()); 215 } 216 217 @Test 218 public void testBulkLoad() throws Exception { 219 runLoad(); 220 installSlowingCoproc(); 221 runCheckWithRetry(); 222 } 223 224 public void runLoad() throws Exception { 225 setupTable(); 226 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 227 LOG.info("Running load with numIterations:" + numImportRounds); 228 for (int i = 0; i < numImportRounds; i++) { 229 runLinkedListMRJob(i); 230 } 231 } 232 233 private byte[][] getSplits(int numRegions) { 234 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); 235 split.setFirstRow(Bytes.toBytes(0L)); 236 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); 237 return split.split(numRegions); 238 } 239 240 private void setupTable() throws IOException, InterruptedException { 241 if (util.getAdmin().tableExists(getTablename())) { 242 util.deleteTable(getTablename()); 243 } 244 245 util.createTable( 246 getTablename(), 247 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM}, 248 getSplits(16) 249 ); 250 251 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 252 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 253 254 TableName t = getTablename(); 255 HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); 256 } 257 258 private void runLinkedListMRJob(int iteration) throws Exception { 259 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " + 260 EnvironmentEdgeManager.currentTime(); 261 Configuration conf = new Configuration(util.getConfiguration()); 262 Path p = null; 263 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { 264 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); 265 } else { 266 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY)); 267 } 268 269 conf.setBoolean("mapreduce.map.speculative", false); 270 conf.setBoolean("mapreduce.reduce.speculative", false); 271 conf.setInt(ROUND_NUM_KEY, iteration); 272 273 Job job = new Job(conf); 274 275 job.setJobName(jobName); 276 277 // set the input format so that we can create map tasks with no data input. 278 job.setInputFormatClass(ITBulkLoadInputFormat.class); 279 280 // Set the mapper classes. 281 job.setMapperClass(LinkedListCreationMapper.class); 282 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 283 job.setMapOutputValueClass(KeyValue.class); 284 285 // Use the identity reducer 286 // So nothing to do here. 287 288 // Set this jar. 289 job.setJarByClass(getClass()); 290 291 // Set where to place the hfiles. 292 FileOutputFormat.setOutputPath(job, p); 293 try (Connection conn = ConnectionFactory.createConnection(conf); 294 Admin admin = conn.getAdmin(); 295 Table table = conn.getTable(getTablename()); 296 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { 297 298 // Configure the partitioner and other things needed for HFileOutputFormat. 299 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 300 301 // Run the job making sure it works. 302 assertEquals(true, job.waitForCompletion(true)); 303 304 // Create a new loader. 305 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 306 307 // Load the HFiles in. 308 loader.doBulkLoad(p, admin, table, regionLocator); 309 } 310 311 // Delete the files. 312 util.getTestFileSystem().delete(p, true); 313 } 314 315 public static class EmptySplit extends InputSplit implements Writable { 316 @Override 317 public void write(DataOutput out) throws IOException { } 318 @Override 319 public void readFields(DataInput in) throws IOException { } 320 @Override 321 public long getLength() { return 0L; } 322 @Override 323 public String[] getLocations() { return new String[0]; } 324 } 325 326 public static class FixedRecordReader<K, V> extends RecordReader<K, V> { 327 private int index = -1; 328 private K[] keys; 329 private V[] values; 330 331 public FixedRecordReader(K[] keys, V[] values) { 332 this.keys = keys; 333 this.values = values; 334 } 335 @Override 336 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 337 InterruptedException { } 338 @Override 339 public boolean nextKeyValue() throws IOException, InterruptedException { 340 return ++index < keys.length; 341 } 342 @Override 343 public K getCurrentKey() throws IOException, InterruptedException { 344 return keys[index]; 345 } 346 @Override 347 public V getCurrentValue() throws IOException, InterruptedException { 348 return values[index]; 349 } 350 @Override 351 public float getProgress() throws IOException, InterruptedException { 352 return (float)index / keys.length; 353 } 354 @Override 355 public void close() throws IOException { 356 } 357 } 358 359 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> { 360 @Override 361 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 362 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 363 ArrayList<InputSplit> ret = new ArrayList<>(numSplits); 364 for (int i = 0; i < numSplits; ++i) { 365 ret.add(new EmptySplit()); 366 } 367 return ret; 368 } 369 370 @Override 371 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, 372 TaskAttemptContext context) 373 throws IOException, InterruptedException { 374 int taskId = context.getTaskAttemptID().getTaskID().getId(); 375 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 376 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 377 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); 378 379 taskId = taskId + iteration * numMapTasks; 380 numMapTasks = numMapTasks * numIterations; 381 382 long chainId = Math.abs(new Random().nextLong()); 383 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations 384 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)}; 385 386 return new FixedRecordReader<>(keys, keys); 387 } 388 } 389 390 /** 391 * Mapper that creates a linked list of KeyValues. 392 * 393 * Each map task generates one linked list. 394 * All lists start on row key 0L. 395 * All lists should be CHAIN_LENGTH long. 396 */ 397 public static class LinkedListCreationMapper 398 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> { 399 400 private Random rand = new Random(); 401 402 @Override 403 protected void map(LongWritable key, LongWritable value, Context context) 404 throws IOException, InterruptedException { 405 long chainId = value.get(); 406 LOG.info("Starting mapper with chainId:" + chainId); 407 408 byte[] chainIdArray = Bytes.toBytes(chainId); 409 long currentRow = 0; 410 411 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 412 long nextRow = getNextRow(0, chainLength); 413 414 for (long i = 0; i < chainLength; i++) { 415 byte[] rk = Bytes.toBytes(currentRow); 416 417 // Next link in the chain. 418 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); 419 // What link in the chain this is. 420 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); 421 // Added data so that large stores are created. 422 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, 423 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50)) 424 ); 425 426 // Emit the key values. 427 context.write(new ImmutableBytesWritable(rk), linkKv); 428 context.write(new ImmutableBytesWritable(rk), sortKv); 429 context.write(new ImmutableBytesWritable(rk), dataKv); 430 // Move to the next row. 431 currentRow = nextRow; 432 nextRow = getNextRow(i+1, chainLength); 433 } 434 } 435 436 /** Returns a unique row id within this chain for this index */ 437 private long getNextRow(long index, long chainLength) { 438 long nextRow = Math.abs(rand.nextLong()); 439 // use significant bits from the random number, but pad with index to ensure it is unique 440 // this also ensures that we do not reuse row = 0 441 // row collisions from multiple mappers are fine, since we guarantee unique chainIds 442 nextRow = nextRow - (nextRow % chainLength) + index; 443 return nextRow; 444 } 445 } 446 447 /** 448 * Writable class used as the key to group links in the linked list. 449 * 450 * Used as the key emited from a pass over the table. 451 */ 452 public static class LinkKey implements WritableComparable<LinkKey> { 453 454 private Long chainId; 455 456 public Long getOrder() { 457 return order; 458 } 459 460 public Long getChainId() { 461 return chainId; 462 } 463 464 private Long order; 465 466 public LinkKey() {} 467 468 public LinkKey(long chainId, long order) { 469 this.chainId = chainId; 470 this.order = order; 471 } 472 473 @Override 474 public int compareTo(LinkKey linkKey) { 475 int res = getChainId().compareTo(linkKey.getChainId()); 476 if (res == 0) { 477 res = getOrder().compareTo(linkKey.getOrder()); 478 } 479 return res; 480 } 481 482 @Override 483 public void write(DataOutput dataOutput) throws IOException { 484 WritableUtils.writeVLong(dataOutput, chainId); 485 WritableUtils.writeVLong(dataOutput, order); 486 } 487 488 @Override 489 public void readFields(DataInput dataInput) throws IOException { 490 chainId = WritableUtils.readVLong(dataInput); 491 order = WritableUtils.readVLong(dataInput); 492 } 493 } 494 495 /** 496 * Writable used as the value emitted from a pass over the hbase table. 497 */ 498 public static class LinkChain implements WritableComparable<LinkChain> { 499 500 public Long getNext() { 501 return next; 502 } 503 504 public Long getRk() { 505 return rk; 506 } 507 508 public LinkChain() {} 509 510 public LinkChain(Long rk, Long next) { 511 this.rk = rk; 512 this.next = next; 513 } 514 515 private Long rk; 516 private Long next; 517 518 @Override 519 public int compareTo(LinkChain linkChain) { 520 int res = getRk().compareTo(linkChain.getRk()); 521 if (res == 0) { 522 res = getNext().compareTo(linkChain.getNext()); 523 } 524 return res; 525 } 526 527 @Override 528 public void write(DataOutput dataOutput) throws IOException { 529 WritableUtils.writeVLong(dataOutput, rk); 530 WritableUtils.writeVLong(dataOutput, next); 531 } 532 533 @Override 534 public void readFields(DataInput dataInput) throws IOException { 535 rk = WritableUtils.readVLong(dataInput); 536 next = WritableUtils.readVLong(dataInput); 537 } 538 } 539 540 /** 541 * Class to figure out what partition to send a link in the chain to. This is based upon 542 * the linkKey's ChainId. 543 */ 544 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> { 545 @Override 546 public int getPartition(LinkKey linkKey, 547 LinkChain linkChain, 548 int numPartitions) { 549 int hash = linkKey.getChainId().hashCode(); 550 return Math.abs(hash % numPartitions); 551 } 552 } 553 554 /** 555 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the 556 * linkKey's ChainId. 557 */ 558 public static class NaturalKeyGroupingComparator extends WritableComparator { 559 560 protected NaturalKeyGroupingComparator() { 561 super(LinkKey.class, true); 562 } 563 564 @Override 565 public int compare(WritableComparable w1, WritableComparable w2) { 566 LinkKey k1 = (LinkKey) w1; 567 LinkKey k2 = (LinkKey) w2; 568 569 return k1.getChainId().compareTo(k2.getChainId()); 570 } 571 } 572 573 /** 574 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based 575 * upon linkKey ChainId and Order. 576 */ 577 public static class CompositeKeyComparator extends WritableComparator { 578 579 protected CompositeKeyComparator() { 580 super(LinkKey.class, true); 581 } 582 583 @Override 584 public int compare(WritableComparable w1, WritableComparable w2) { 585 LinkKey k1 = (LinkKey) w1; 586 LinkKey k2 = (LinkKey) w2; 587 588 return k1.compareTo(k2); 589 } 590 } 591 592 /** 593 * Mapper to pass over the table. 594 * 595 * For every row there could be multiple chains that landed on this row. So emit a linkKey 596 * and value for each. 597 */ 598 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> { 599 @Override 600 protected void map(ImmutableBytesWritable key, Result value, Context context) 601 throws IOException, InterruptedException { 602 long longRk = Bytes.toLong(value.getRow()); 603 604 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { 605 long chainId = Bytes.toLong(entry.getKey()); 606 long next = Bytes.toLong(entry.getValue()); 607 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); 608 long order = Bytes.toLong(CellUtil.cloneValue(c)); 609 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next)); 610 } 611 } 612 } 613 614 /** 615 * Class that does the actual checking of the links. 616 * 617 * All links in the chain should be grouped and sorted when sent to this class. Then the chain 618 * will be traversed making sure that no link is missing and that the chain is the correct length. 619 * 620 * This will throw an exception if anything is not correct. That causes the job to fail if any 621 * data is corrupt. 622 */ 623 public static class LinkedListCheckingReducer 624 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> { 625 @Override 626 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context) 627 throws java.io.IOException, java.lang.InterruptedException { 628 long next = -1L; 629 long prev = -1L; 630 long count = 0L; 631 632 for (LinkChain lc : values) { 633 634 if (next == -1) { 635 if (lc.getRk() != 0L) { 636 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() 637 + ". Chain:" + key.chainId + ", order:" + key.order; 638 logError(msg, context); 639 throw new RuntimeException(msg); 640 } 641 next = lc.getNext(); 642 } else { 643 if (next != lc.getRk()) { 644 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " 645 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId 646 + ", order:" + key.order; 647 logError(msg, context); 648 throw new RuntimeException(msg); 649 } 650 prev = lc.getRk(); 651 next = lc.getNext(); 652 } 653 count++; 654 } 655 656 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 657 if (count != expectedChainLen) { 658 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " 659 + count + ". Chain:" + key.chainId + ", order:" + key.order; 660 logError(msg, context); 661 throw new RuntimeException(msg); 662 } 663 } 664 665 private static void logError(String msg, Context context) throws IOException { 666 TableName table = getTableName(context.getConfiguration()); 667 668 LOG.error("Failure in chain verification: " + msg); 669 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 670 Admin admin = connection.getAdmin()) { 671 LOG.error("cluster status:\n" + admin.getClusterStatus()); 672 LOG.error("table regions:\n" 673 + Joiner.on("\n").join(admin.getTableRegions(table))); 674 } 675 } 676 } 677 678 private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException { 679 try { 680 runCheck(); 681 } catch (Throwable t) { 682 LOG.warn("Received " + StringUtils.stringifyException(t)); 683 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); 684 runCheck(); 685 throw t; // we should still fail the test even if second retry succeeds 686 } 687 // everything green 688 } 689 690 691 /** 692 * After adding data to the table start a mr job to 693 * @throws IOException 694 * @throws ClassNotFoundException 695 * @throws InterruptedException 696 */ 697 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { 698 LOG.info("Running check"); 699 Configuration conf = getConf(); 700 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime(); 701 Path p = util.getDataTestDirOnTestFS(jobName); 702 703 Job job = new Job(conf); 704 job.setJarByClass(getClass()); 705 job.setJobName(jobName); 706 707 job.setPartitionerClass(NaturalKeyPartitioner.class); 708 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 709 job.setSortComparatorClass(CompositeKeyComparator.class); 710 711 Scan scan = new Scan(); 712 scan.addFamily(CHAIN_FAM); 713 scan.addFamily(SORT_FAM); 714 scan.setMaxVersions(1); 715 scan.setCacheBlocks(false); 716 scan.setBatch(1000); 717 718 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 719 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 720 scan.setConsistency(Consistency.TIMELINE); 721 } 722 723 TableMapReduceUtil.initTableMapperJob( 724 getTablename().getName(), 725 scan, 726 LinkedListCheckingMapper.class, 727 LinkKey.class, 728 LinkChain.class, 729 job 730 ); 731 732 job.setReducerClass(LinkedListCheckingReducer.class); 733 job.setOutputKeyClass(NullWritable.class); 734 job.setOutputValueClass(NullWritable.class); 735 736 FileOutputFormat.setOutputPath(job, p); 737 738 assertEquals(true, job.waitForCompletion(true)); 739 740 // Delete the files. 741 util.getTestFileSystem().delete(p, true); 742 } 743 744 @Override 745 public void setUpCluster() throws Exception { 746 util = getTestingUtil(getConf()); 747 util.initializeCluster(1); 748 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 749 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 750 LOG.debug("Region Replicas enabled: " + replicaCount); 751 } 752 753 // Scale this up on a real cluster 754 if (util.isDistributedCluster()) { 755 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 756 Integer.toString(util.getAdmin().getRegionServers().size() * 10)); 757 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 758 } else { 759 util.startMiniMapReduceCluster(); 760 } 761 } 762 763 @Override 764 protected void addOptions() { 765 super.addOptions(); 766 super.addOptNoArg(OPT_CHECK, "Run check only"); 767 super.addOptNoArg(OPT_LOAD, "Run load only"); 768 } 769 770 @Override 771 protected void processOptions(CommandLine cmd) { 772 super.processOptions(cmd); 773 check = cmd.hasOption(OPT_CHECK); 774 load = cmd.hasOption(OPT_LOAD); 775 } 776 777 @Override 778 public int runTestFromCommandLine() throws Exception { 779 if (load) { 780 runLoad(); 781 } else if (check) { 782 installSlowingCoproc(); 783 runCheckWithRetry(); 784 } else { 785 testBulkLoad(); 786 } 787 return 0; 788 } 789 790 @Override 791 public TableName getTablename() { 792 return getTableName(getConf()); 793 } 794 795 public static TableName getTableName(Configuration conf) { 796 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME)); 797 } 798 799 @Override 800 protected Set<String> getColumnFamilies() { 801 return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM), 802 Bytes.toString(SORT_FAM)); 803 } 804 805 public static void main(String[] args) throws Exception { 806 Configuration conf = HBaseConfiguration.create(); 807 IntegrationTestingUtility.setUseDistributedCluster(conf); 808 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); 809 System.exit(status); 810 } 811}