001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.EnumSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.Random; 032import java.util.Set; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.commons.lang3.RandomStringUtils; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.ClusterMetrics.Option; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.IntegrationTestBase; 043import org.apache.hadoop.hbase.IntegrationTestingUtility; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Consistency; 050import org.apache.hadoop.hbase.client.RegionLocator; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.coprocessor.ObserverContext; 057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 059import org.apache.hadoop.hbase.coprocessor.RegionObserver; 060import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 061import org.apache.hadoop.hbase.regionserver.InternalScanner; 062import org.apache.hadoop.hbase.testclassification.IntegrationTests; 063import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.RegionSplitter; 067import org.apache.hadoop.io.LongWritable; 068import org.apache.hadoop.io.NullWritable; 069import org.apache.hadoop.io.Writable; 070import org.apache.hadoop.io.WritableComparable; 071import org.apache.hadoop.io.WritableComparator; 072import org.apache.hadoop.io.WritableUtils; 073import org.apache.hadoop.mapreduce.InputFormat; 074import org.apache.hadoop.mapreduce.InputSplit; 075import org.apache.hadoop.mapreduce.Job; 076import org.apache.hadoop.mapreduce.JobContext; 077import org.apache.hadoop.mapreduce.Mapper; 078import org.apache.hadoop.mapreduce.Partitioner; 079import org.apache.hadoop.mapreduce.RecordReader; 080import org.apache.hadoop.mapreduce.Reducer; 081import org.apache.hadoop.mapreduce.TaskAttemptContext; 082import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 083import org.apache.hadoop.util.StringUtils; 084import org.apache.hadoop.util.ToolRunner; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 090import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 091import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 092 093/** 094 * Test Bulk Load and MR on a distributed cluster. 095 * It starts an MR job that creates linked chains 096 * 097 * The format of rows is like this: 098 * Row Key -> Long 099 * 100 * L:<< Chain Id >> -> Row Key of the next link in the chain 101 * S:<< Chain Id >> -> The step in the chain that his link is. 102 * D:<< Chain Id >> -> Random Data. 103 * 104 * All chains start on row 0. 105 * All rk's are > 0. 106 * 107 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job. 108 * 109 * There are a few options exposed: 110 * 111 * hbase.IntegrationTestBulkLoad.chainLength 112 * The number of rows that will be part of each and every chain. 113 * 114 * hbase.IntegrationTestBulkLoad.numMaps 115 * The number of mappers that will be run. Each mapper creates on linked list chain. 116 * 117 * hbase.IntegrationTestBulkLoad.numImportRounds 118 * How many jobs will be run to create linked lists. 119 * 120 * hbase.IntegrationTestBulkLoad.tableName 121 * The name of the table. 122 * 123 * hbase.IntegrationTestBulkLoad.replicaCount 124 * How many region replicas to configure for the table under test. 125 */ 126@Category(IntegrationTests.class) 127public class IntegrationTestBulkLoad extends IntegrationTestBase { 128 129 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class); 130 131 private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); 132 private static final byte[] SORT_FAM = Bytes.toBytes("S"); 133 private static final byte[] DATA_FAM = Bytes.toBytes("D"); 134 135 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; 136 private static int CHAIN_LENGTH = 500000; 137 138 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 139 private static int NUM_MAPS = 1; 140 141 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 142 private static int NUM_IMPORT_ROUNDS = 1; 143 144 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; 145 146 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; 147 private static String TABLE_NAME = "IntegrationTestBulkLoad"; 148 149 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 150 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 151 152 private static final String OPT_LOAD = "load"; 153 private static final String OPT_CHECK = "check"; 154 155 private boolean load = false; 156 private boolean check = false; 157 158 public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver { 159 static final AtomicLong sleepTime = new AtomicLong(2000); 160 Random r = new Random(); 161 AtomicLong countOfNext = new AtomicLong(0); 162 AtomicLong countOfOpen = new AtomicLong(0); 163 public SlowMeCoproScanOperations() {} 164 165 @Override 166 public Optional<RegionObserver> getRegionObserver() { 167 return Optional.of(this); 168 } 169 170 @Override 171 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 172 final Scan scan) throws IOException { 173 if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly 174 slowdownCode(e); 175 } 176 } 177 178 @Override 179 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 180 final InternalScanner s, final List<Result> results, 181 final int limit, final boolean hasMore) throws IOException { 182 //this will slow down a certain next operation if the conditions are met. The slowness 183 //will allow the call to go to a replica 184 countOfNext.incrementAndGet(); 185 if (countOfNext.get() == 0 || countOfNext.get() == 4) { 186 slowdownCode(e); 187 } 188 return true; 189 } 190 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 191 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 192 try { 193 if (sleepTime.get() > 0) { 194 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 195 Thread.sleep(sleepTime.get()); 196 } 197 } catch (InterruptedException e1) { 198 LOG.error(e1.toString(), e1); 199 } 200 } 201 } 202 } 203 204 /** 205 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. 206 */ 207 private void installSlowingCoproc() throws IOException, InterruptedException { 208 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 209 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 210 211 TableName t = getTablename(); 212 Admin admin = util.getAdmin(); 213 TableDescriptor desc = admin.getDescriptor(t); 214 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc); 215 builder.setCoprocessor(SlowMeCoproScanOperations.class.getName()); 216 HBaseTestingUtility.modifyTableSync(admin, builder.build()); 217 } 218 219 @Test 220 public void testBulkLoad() throws Exception { 221 runLoad(); 222 installSlowingCoproc(); 223 runCheckWithRetry(); 224 } 225 226 public void runLoad() throws Exception { 227 setupTable(); 228 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 229 LOG.info("Running load with numIterations:" + numImportRounds); 230 for (int i = 0; i < numImportRounds; i++) { 231 runLinkedListMRJob(i); 232 } 233 } 234 235 private byte[][] getSplits(int numRegions) { 236 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); 237 split.setFirstRow(Bytes.toBytes(0L)); 238 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); 239 return split.split(numRegions); 240 } 241 242 private void setupTable() throws IOException, InterruptedException { 243 if (util.getAdmin().tableExists(getTablename())) { 244 util.deleteTable(getTablename()); 245 } 246 247 util.createTable( 248 getTablename(), 249 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM}, 250 getSplits(16) 251 ); 252 253 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 254 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 255 256 TableName t = getTablename(); 257 HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); 258 } 259 260 private void runLinkedListMRJob(int iteration) throws Exception { 261 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " + 262 EnvironmentEdgeManager.currentTime(); 263 Configuration conf = new Configuration(util.getConfiguration()); 264 Path p = null; 265 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { 266 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); 267 } else { 268 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY)); 269 } 270 271 conf.setBoolean("mapreduce.map.speculative", false); 272 conf.setBoolean("mapreduce.reduce.speculative", false); 273 conf.setInt(ROUND_NUM_KEY, iteration); 274 275 Job job = new Job(conf); 276 277 job.setJobName(jobName); 278 279 // set the input format so that we can create map tasks with no data input. 280 job.setInputFormatClass(ITBulkLoadInputFormat.class); 281 282 // Set the mapper classes. 283 job.setMapperClass(LinkedListCreationMapper.class); 284 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 285 job.setMapOutputValueClass(KeyValue.class); 286 287 // Use the identity reducer 288 // So nothing to do here. 289 290 // Set this jar. 291 job.setJarByClass(getClass()); 292 293 // Set where to place the hfiles. 294 FileOutputFormat.setOutputPath(job, p); 295 try (Connection conn = ConnectionFactory.createConnection(conf); 296 Admin admin = conn.getAdmin(); 297 Table table = conn.getTable(getTablename()); 298 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { 299 300 // Configure the partitioner and other things needed for HFileOutputFormat. 301 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 302 303 // Run the job making sure it works. 304 assertEquals(true, job.waitForCompletion(true)); 305 306 // Create a new loader. 307 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 308 309 // Load the HFiles in. 310 loader.doBulkLoad(p, admin, table, regionLocator); 311 } 312 313 // Delete the files. 314 util.getTestFileSystem().delete(p, true); 315 } 316 317 public static class EmptySplit extends InputSplit implements Writable { 318 @Override 319 public void write(DataOutput out) throws IOException { } 320 @Override 321 public void readFields(DataInput in) throws IOException { } 322 @Override 323 public long getLength() { return 0L; } 324 @Override 325 public String[] getLocations() { return new String[0]; } 326 } 327 328 public static class FixedRecordReader<K, V> extends RecordReader<K, V> { 329 private int index = -1; 330 private K[] keys; 331 private V[] values; 332 333 public FixedRecordReader(K[] keys, V[] values) { 334 this.keys = keys; 335 this.values = values; 336 } 337 @Override 338 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 339 InterruptedException { } 340 @Override 341 public boolean nextKeyValue() throws IOException, InterruptedException { 342 return ++index < keys.length; 343 } 344 @Override 345 public K getCurrentKey() throws IOException, InterruptedException { 346 return keys[index]; 347 } 348 @Override 349 public V getCurrentValue() throws IOException, InterruptedException { 350 return values[index]; 351 } 352 @Override 353 public float getProgress() throws IOException, InterruptedException { 354 return (float)index / keys.length; 355 } 356 @Override 357 public void close() throws IOException { 358 } 359 } 360 361 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> { 362 @Override 363 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 364 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 365 ArrayList<InputSplit> ret = new ArrayList<>(numSplits); 366 for (int i = 0; i < numSplits; ++i) { 367 ret.add(new EmptySplit()); 368 } 369 return ret; 370 } 371 372 @Override 373 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, 374 TaskAttemptContext context) 375 throws IOException, InterruptedException { 376 int taskId = context.getTaskAttemptID().getTaskID().getId(); 377 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 378 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 379 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); 380 381 taskId = taskId + iteration * numMapTasks; 382 numMapTasks = numMapTasks * numIterations; 383 384 long chainId = Math.abs(new Random().nextLong()); 385 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations 386 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)}; 387 388 return new FixedRecordReader<>(keys, keys); 389 } 390 } 391 392 /** 393 * Mapper that creates a linked list of KeyValues. 394 * 395 * Each map task generates one linked list. 396 * All lists start on row key 0L. 397 * All lists should be CHAIN_LENGTH long. 398 */ 399 public static class LinkedListCreationMapper 400 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> { 401 402 private Random rand = new Random(); 403 404 @Override 405 protected void map(LongWritable key, LongWritable value, Context context) 406 throws IOException, InterruptedException { 407 long chainId = value.get(); 408 LOG.info("Starting mapper with chainId:" + chainId); 409 410 byte[] chainIdArray = Bytes.toBytes(chainId); 411 long currentRow = 0; 412 413 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 414 long nextRow = getNextRow(0, chainLength); 415 416 for (long i = 0; i < chainLength; i++) { 417 byte[] rk = Bytes.toBytes(currentRow); 418 419 // Next link in the chain. 420 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); 421 // What link in the chain this is. 422 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); 423 // Added data so that large stores are created. 424 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, 425 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50)) 426 ); 427 428 // Emit the key values. 429 context.write(new ImmutableBytesWritable(rk), linkKv); 430 context.write(new ImmutableBytesWritable(rk), sortKv); 431 context.write(new ImmutableBytesWritable(rk), dataKv); 432 // Move to the next row. 433 currentRow = nextRow; 434 nextRow = getNextRow(i+1, chainLength); 435 } 436 } 437 438 /** Returns a unique row id within this chain for this index */ 439 private long getNextRow(long index, long chainLength) { 440 long nextRow = Math.abs(rand.nextLong()); 441 // use significant bits from the random number, but pad with index to ensure it is unique 442 // this also ensures that we do not reuse row = 0 443 // row collisions from multiple mappers are fine, since we guarantee unique chainIds 444 nextRow = nextRow - (nextRow % chainLength) + index; 445 return nextRow; 446 } 447 } 448 449 /** 450 * Writable class used as the key to group links in the linked list. 451 * 452 * Used as the key emited from a pass over the table. 453 */ 454 public static class LinkKey implements WritableComparable<LinkKey> { 455 456 private Long chainId; 457 458 public Long getOrder() { 459 return order; 460 } 461 462 public Long getChainId() { 463 return chainId; 464 } 465 466 private Long order; 467 468 public LinkKey() {} 469 470 public LinkKey(long chainId, long order) { 471 this.chainId = chainId; 472 this.order = order; 473 } 474 475 @Override 476 public int compareTo(LinkKey linkKey) { 477 int res = getChainId().compareTo(linkKey.getChainId()); 478 if (res == 0) { 479 res = getOrder().compareTo(linkKey.getOrder()); 480 } 481 return res; 482 } 483 484 @Override 485 public void write(DataOutput dataOutput) throws IOException { 486 WritableUtils.writeVLong(dataOutput, chainId); 487 WritableUtils.writeVLong(dataOutput, order); 488 } 489 490 @Override 491 public void readFields(DataInput dataInput) throws IOException { 492 chainId = WritableUtils.readVLong(dataInput); 493 order = WritableUtils.readVLong(dataInput); 494 } 495 } 496 497 /** 498 * Writable used as the value emitted from a pass over the hbase table. 499 */ 500 public static class LinkChain implements WritableComparable<LinkChain> { 501 502 public Long getNext() { 503 return next; 504 } 505 506 public Long getRk() { 507 return rk; 508 } 509 510 public LinkChain() {} 511 512 public LinkChain(Long rk, Long next) { 513 this.rk = rk; 514 this.next = next; 515 } 516 517 private Long rk; 518 private Long next; 519 520 @Override 521 public int compareTo(LinkChain linkChain) { 522 int res = getRk().compareTo(linkChain.getRk()); 523 if (res == 0) { 524 res = getNext().compareTo(linkChain.getNext()); 525 } 526 return res; 527 } 528 529 @Override 530 public void write(DataOutput dataOutput) throws IOException { 531 WritableUtils.writeVLong(dataOutput, rk); 532 WritableUtils.writeVLong(dataOutput, next); 533 } 534 535 @Override 536 public void readFields(DataInput dataInput) throws IOException { 537 rk = WritableUtils.readVLong(dataInput); 538 next = WritableUtils.readVLong(dataInput); 539 } 540 } 541 542 /** 543 * Class to figure out what partition to send a link in the chain to. This is based upon 544 * the linkKey's ChainId. 545 */ 546 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> { 547 @Override 548 public int getPartition(LinkKey linkKey, 549 LinkChain linkChain, 550 int numPartitions) { 551 int hash = linkKey.getChainId().hashCode(); 552 return Math.abs(hash % numPartitions); 553 } 554 } 555 556 /** 557 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the 558 * linkKey's ChainId. 559 */ 560 public static class NaturalKeyGroupingComparator extends WritableComparator { 561 562 protected NaturalKeyGroupingComparator() { 563 super(LinkKey.class, true); 564 } 565 566 @Override 567 public int compare(WritableComparable w1, WritableComparable w2) { 568 LinkKey k1 = (LinkKey) w1; 569 LinkKey k2 = (LinkKey) w2; 570 571 return k1.getChainId().compareTo(k2.getChainId()); 572 } 573 } 574 575 /** 576 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based 577 * upon linkKey ChainId and Order. 578 */ 579 public static class CompositeKeyComparator extends WritableComparator { 580 581 protected CompositeKeyComparator() { 582 super(LinkKey.class, true); 583 } 584 585 @Override 586 public int compare(WritableComparable w1, WritableComparable w2) { 587 LinkKey k1 = (LinkKey) w1; 588 LinkKey k2 = (LinkKey) w2; 589 590 return k1.compareTo(k2); 591 } 592 } 593 594 /** 595 * Mapper to pass over the table. 596 * 597 * For every row there could be multiple chains that landed on this row. So emit a linkKey 598 * and value for each. 599 */ 600 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> { 601 @Override 602 protected void map(ImmutableBytesWritable key, Result value, Context context) 603 throws IOException, InterruptedException { 604 long longRk = Bytes.toLong(value.getRow()); 605 606 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { 607 long chainId = Bytes.toLong(entry.getKey()); 608 long next = Bytes.toLong(entry.getValue()); 609 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); 610 long order = Bytes.toLong(CellUtil.cloneValue(c)); 611 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next)); 612 } 613 } 614 } 615 616 /** 617 * Class that does the actual checking of the links. 618 * 619 * All links in the chain should be grouped and sorted when sent to this class. Then the chain 620 * will be traversed making sure that no link is missing and that the chain is the correct length. 621 * 622 * This will throw an exception if anything is not correct. That causes the job to fail if any 623 * data is corrupt. 624 */ 625 public static class LinkedListCheckingReducer 626 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> { 627 @Override 628 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context) 629 throws java.io.IOException, java.lang.InterruptedException { 630 long next = -1L; 631 long prev = -1L; 632 long count = 0L; 633 634 for (LinkChain lc : values) { 635 636 if (next == -1) { 637 if (lc.getRk() != 0L) { 638 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() 639 + ". Chain:" + key.chainId + ", order:" + key.order; 640 logError(msg, context); 641 throw new RuntimeException(msg); 642 } 643 next = lc.getNext(); 644 } else { 645 if (next != lc.getRk()) { 646 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " 647 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId 648 + ", order:" + key.order; 649 logError(msg, context); 650 throw new RuntimeException(msg); 651 } 652 prev = lc.getRk(); 653 next = lc.getNext(); 654 } 655 count++; 656 } 657 658 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 659 if (count != expectedChainLen) { 660 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " 661 + count + ". Chain:" + key.chainId + ", order:" + key.order; 662 logError(msg, context); 663 throw new RuntimeException(msg); 664 } 665 } 666 667 private static void logError(String msg, Context context) throws IOException { 668 TableName table = getTableName(context.getConfiguration()); 669 670 LOG.error("Failure in chain verification: " + msg); 671 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 672 Admin admin = connection.getAdmin()) { 673 LOG.error("cluster status:\n" + admin.getClusterStatus()); 674 LOG.error("table regions:\n" 675 + Joiner.on("\n").join(admin.getTableRegions(table))); 676 } 677 } 678 } 679 680 private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException { 681 try { 682 runCheck(); 683 } catch (Throwable t) { 684 LOG.warn("Received " + StringUtils.stringifyException(t)); 685 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); 686 runCheck(); 687 throw t; // we should still fail the test even if second retry succeeds 688 } 689 // everything green 690 } 691 692 693 /** 694 * After adding data to the table start a mr job to 695 * @throws IOException 696 * @throws ClassNotFoundException 697 * @throws InterruptedException 698 */ 699 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { 700 LOG.info("Running check"); 701 Configuration conf = getConf(); 702 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime(); 703 Path p = util.getDataTestDirOnTestFS(jobName); 704 705 Job job = new Job(conf); 706 job.setJarByClass(getClass()); 707 job.setJobName(jobName); 708 709 job.setPartitionerClass(NaturalKeyPartitioner.class); 710 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 711 job.setSortComparatorClass(CompositeKeyComparator.class); 712 713 Scan scan = new Scan(); 714 scan.addFamily(CHAIN_FAM); 715 scan.addFamily(SORT_FAM); 716 scan.setMaxVersions(1); 717 scan.setCacheBlocks(false); 718 scan.setBatch(1000); 719 720 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 721 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 722 scan.setConsistency(Consistency.TIMELINE); 723 } 724 725 TableMapReduceUtil.initTableMapperJob( 726 getTablename().getName(), 727 scan, 728 LinkedListCheckingMapper.class, 729 LinkKey.class, 730 LinkChain.class, 731 job 732 ); 733 734 job.setReducerClass(LinkedListCheckingReducer.class); 735 job.setOutputKeyClass(NullWritable.class); 736 job.setOutputValueClass(NullWritable.class); 737 738 FileOutputFormat.setOutputPath(job, p); 739 740 assertEquals(true, job.waitForCompletion(true)); 741 742 // Delete the files. 743 util.getTestFileSystem().delete(p, true); 744 } 745 746 @Override 747 public void setUpCluster() throws Exception { 748 util = getTestingUtil(getConf()); 749 util.initializeCluster(1); 750 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 751 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 752 LOG.debug("Region Replicas enabled: " + replicaCount); 753 } 754 755 // Scale this up on a real cluster 756 if (util.isDistributedCluster()) { 757 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 758 Integer.toString(util.getAdmin() 759 .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 760 .getLiveServerMetrics().size() * 10) 761 ); 762 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 763 } else { 764 util.startMiniMapReduceCluster(); 765 } 766 } 767 768 @Override 769 protected void addOptions() { 770 super.addOptions(); 771 super.addOptNoArg(OPT_CHECK, "Run check only"); 772 super.addOptNoArg(OPT_LOAD, "Run load only"); 773 } 774 775 @Override 776 protected void processOptions(CommandLine cmd) { 777 super.processOptions(cmd); 778 check = cmd.hasOption(OPT_CHECK); 779 load = cmd.hasOption(OPT_LOAD); 780 } 781 782 @Override 783 public int runTestFromCommandLine() throws Exception { 784 if (load) { 785 runLoad(); 786 } else if (check) { 787 installSlowingCoproc(); 788 runCheckWithRetry(); 789 } else { 790 testBulkLoad(); 791 } 792 return 0; 793 } 794 795 @Override 796 public TableName getTablename() { 797 return getTableName(getConf()); 798 } 799 800 public static TableName getTableName(Configuration conf) { 801 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME)); 802 } 803 804 @Override 805 protected Set<String> getColumnFamilies() { 806 return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM), 807 Bytes.toString(SORT_FAM)); 808 } 809 810 public static void main(String[] args) throws Exception { 811 Configuration conf = HBaseConfiguration.create(); 812 IntegrationTestingUtility.setUseDistributedCluster(conf); 813 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); 814 System.exit(status); 815 } 816}