001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.IntegrationTestBase; 040import org.apache.hadoop.hbase.IntegrationTestingUtility; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Consistency; 047import org.apache.hadoop.hbase.client.RegionLocator; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.coprocessor.ObserverContext; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 055import org.apache.hadoop.hbase.coprocessor.RegionObserver; 056import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.testclassification.IntegrationTests; 059import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.RegionSplitter; 063import org.apache.hadoop.io.LongWritable; 064import org.apache.hadoop.io.NullWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.WritableComparable; 067import org.apache.hadoop.io.WritableComparator; 068import org.apache.hadoop.io.WritableUtils; 069import org.apache.hadoop.mapreduce.InputFormat; 070import org.apache.hadoop.mapreduce.InputSplit; 071import org.apache.hadoop.mapreduce.Job; 072import org.apache.hadoop.mapreduce.JobContext; 073import org.apache.hadoop.mapreduce.Mapper; 074import org.apache.hadoop.mapreduce.Partitioner; 075import org.apache.hadoop.mapreduce.RecordReader; 076import org.apache.hadoop.mapreduce.Reducer; 077import org.apache.hadoop.mapreduce.TaskAttemptContext; 078import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 079import org.apache.hadoop.util.StringUtils; 080import org.apache.hadoop.util.ToolRunner; 081import org.junit.jupiter.api.Tag; 082import org.junit.jupiter.api.Test; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 087import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 089 090/** 091 * Test Bulk Load and MR on a distributed cluster. It starts an MR job that creates linked chains 092 * The format of rows is like this: Row Key -> Long L:<< Chain Id >> -> Row Key of the next link in 093 * the chain S:<< Chain Id >> -> The step in the chain that his link is. D:<< Chain Id >> -> Random 094 * Data. All chains start on row 0. All rk's are > 0. After creating the linked lists they are 095 * walked over using a TableMapper based Mapreduce Job. There are a few options exposed: 096 * hbase.IntegrationTestBulkLoad.chainLength The number of rows that will be part of each and every 097 * chain. hbase.IntegrationTestBulkLoad.numMaps The number of mappers that will be run. Each mapper 098 * creates on linked list chain. hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be 099 * run to create linked lists. hbase.IntegrationTestBulkLoad.tableName The name of the table. 100 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table 101 * under test. 102 */ 103@Tag(IntegrationTests.TAG) 104public class IntegrationTestBulkLoad extends IntegrationTestBase { 105 106 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class); 107 108 private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); 109 private static final byte[] SORT_FAM = Bytes.toBytes("S"); 110 private static final byte[] DATA_FAM = Bytes.toBytes("D"); 111 112 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; 113 private static int CHAIN_LENGTH = 500000; 114 115 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 116 private static int NUM_MAPS = 1; 117 118 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 119 private static int NUM_IMPORT_ROUNDS = 1; 120 121 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; 122 123 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; 124 private static String TABLE_NAME = "IntegrationTestBulkLoad"; 125 126 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 127 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 128 129 private static final String OPT_LOAD = "load"; 130 private static final String OPT_CHECK = "check"; 131 132 private boolean load = false; 133 private boolean check = false; 134 135 public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver { 136 static final AtomicLong sleepTime = new AtomicLong(2000); 137 AtomicLong countOfNext = new AtomicLong(0); 138 AtomicLong countOfOpen = new AtomicLong(0); 139 140 public SlowMeCoproScanOperations() { 141 } 142 143 @Override 144 public Optional<RegionObserver> getRegionObserver() { 145 return Optional.of(this); 146 } 147 148 @Override 149 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 150 final Scan scan) throws IOException { 151 if (countOfOpen.incrementAndGet() == 2) { // slowdown openScanner randomly 152 slowdownCode(e); 153 } 154 } 155 156 @Override 157 public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 158 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 159 throws IOException { 160 // this will slow down a certain next operation if the conditions are met. The slowness 161 // will allow the call to go to a replica 162 countOfNext.incrementAndGet(); 163 if (countOfNext.get() == 0 || countOfNext.get() == 4) { 164 slowdownCode(e); 165 } 166 return true; 167 } 168 169 protected void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) { 170 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 171 try { 172 if (sleepTime.get() > 0) { 173 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 174 Thread.sleep(sleepTime.get()); 175 } 176 } catch (InterruptedException e1) { 177 LOG.error(e1.toString(), e1); 178 } 179 } 180 } 181 } 182 183 /** 184 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. 185 */ 186 private void installSlowingCoproc() throws IOException, InterruptedException { 187 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 188 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 189 190 TableName t = getTablename(); 191 Admin admin = util.getAdmin(); 192 TableDescriptor desc = admin.getDescriptor(t); 193 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc); 194 builder.setCoprocessor(SlowMeCoproScanOperations.class.getName()); 195 admin.modifyTable(builder.build()); 196 } 197 198 @Test 199 public void testBulkLoad() throws Exception { 200 runLoad(); 201 installSlowingCoproc(); 202 runCheckWithRetry(); 203 } 204 205 public void runLoad() throws Exception { 206 setupTable(); 207 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 208 LOG.info("Running load with numIterations:" + numImportRounds); 209 for (int i = 0; i < numImportRounds; i++) { 210 runLinkedListMRJob(i); 211 } 212 } 213 214 private byte[][] getSplits(int numRegions) { 215 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); 216 split.setFirstRow(Bytes.toBytes(0L)); 217 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); 218 return split.split(numRegions); 219 } 220 221 private void setupTable() throws IOException, InterruptedException { 222 if (util.getAdmin().tableExists(getTablename())) { 223 util.deleteTable(getTablename()); 224 } 225 226 util.createTable(getTablename(), new byte[][] { CHAIN_FAM, SORT_FAM, DATA_FAM }, getSplits(16)); 227 228 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 229 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 230 231 TableName t = getTablename(); 232 HBaseTestingUtil.setReplicas(util.getAdmin(), t, replicaCount); 233 } 234 235 private void runLinkedListMRJob(int iteration) throws Exception { 236 String jobName = 237 IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime(); 238 Configuration conf = new Configuration(util.getConfiguration()); 239 Path p = null; 240 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { 241 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); 242 } else { 243 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY)); 244 } 245 246 conf.setBoolean("mapreduce.map.speculative", false); 247 conf.setBoolean("mapreduce.reduce.speculative", false); 248 conf.setInt(ROUND_NUM_KEY, iteration); 249 250 Job job = new Job(conf); 251 252 job.setJobName(jobName); 253 254 // set the input format so that we can create map tasks with no data input. 255 job.setInputFormatClass(ITBulkLoadInputFormat.class); 256 257 // Set the mapper classes. 258 job.setMapperClass(LinkedListCreationMapper.class); 259 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 260 job.setMapOutputValueClass(KeyValue.class); 261 262 // Use the identity reducer 263 // So nothing to do here. 264 265 // Set this jar. 266 job.setJarByClass(getClass()); 267 268 // Set where to place the hfiles. 269 FileOutputFormat.setOutputPath(job, p); 270 try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); 271 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { 272 // Configure the partitioner and other things needed for HFileOutputFormat. 273 HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()), 274 regionLocator); 275 // Run the job making sure it works. 276 assertTrue(job.waitForCompletion(true)); 277 } 278 // Create a new loader. 279 BulkLoadHFiles loader = BulkLoadHFiles.create(conf); 280 // Load the HFiles in. 281 loader.bulkLoad(getTablename(), p); 282 // Delete the files. 283 util.getTestFileSystem().delete(p, true); 284 } 285 286 public static class EmptySplit extends InputSplit implements Writable { 287 @Override 288 public void write(DataOutput out) throws IOException { 289 } 290 291 @Override 292 public void readFields(DataInput in) throws IOException { 293 } 294 295 @Override 296 public long getLength() { 297 return 0L; 298 } 299 300 @Override 301 public String[] getLocations() { 302 return new String[0]; 303 } 304 } 305 306 public static class FixedRecordReader<K, V> extends RecordReader<K, V> { 307 private int index = -1; 308 private K[] keys; 309 private V[] values; 310 311 public FixedRecordReader(K[] keys, V[] values) { 312 this.keys = keys; 313 this.values = values; 314 } 315 316 @Override 317 public void initialize(InputSplit split, TaskAttemptContext context) 318 throws IOException, InterruptedException { 319 } 320 321 @Override 322 public boolean nextKeyValue() throws IOException, InterruptedException { 323 return ++index < keys.length; 324 } 325 326 @Override 327 public K getCurrentKey() throws IOException, InterruptedException { 328 return keys[index]; 329 } 330 331 @Override 332 public V getCurrentValue() throws IOException, InterruptedException { 333 return values[index]; 334 } 335 336 @Override 337 public float getProgress() throws IOException, InterruptedException { 338 return (float) index / keys.length; 339 } 340 341 @Override 342 public void close() throws IOException { 343 } 344 } 345 346 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> { 347 @Override 348 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 349 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 350 ArrayList<InputSplit> ret = new ArrayList<>(numSplits); 351 for (int i = 0; i < numSplits; ++i) { 352 ret.add(new EmptySplit()); 353 } 354 return ret; 355 } 356 357 @Override 358 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, 359 TaskAttemptContext context) throws IOException, InterruptedException { 360 int taskId = context.getTaskAttemptID().getTaskID().getId(); 361 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 362 int numIterations = 363 context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 364 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); 365 366 taskId = taskId + iteration * numMapTasks; 367 numMapTasks = numMapTasks * numIterations; 368 369 long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 370 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per 371 // task and across iterations 372 LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) }; 373 374 return new FixedRecordReader<>(keys, keys); 375 } 376 } 377 378 /** 379 * Mapper that creates a linked list of KeyValues. Each map task generates one linked list. All 380 * lists start on row key 0L. All lists should be CHAIN_LENGTH long. 381 */ 382 public static class LinkedListCreationMapper 383 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> { 384 385 @Override 386 protected void map(LongWritable key, LongWritable value, Context context) 387 throws IOException, InterruptedException { 388 long chainId = value.get(); 389 LOG.info("Starting mapper with chainId:" + chainId); 390 391 byte[] chainIdArray = Bytes.toBytes(chainId); 392 long currentRow = 0; 393 394 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 395 long nextRow = getNextRow(0, chainLength); 396 byte[] valueBytes = new byte[50]; 397 398 for (long i = 0; i < chainLength; i++) { 399 byte[] rk = Bytes.toBytes(currentRow); 400 401 // Next link in the chain. 402 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); 403 // What link in the chain this is. 404 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); 405 // Added data so that large stores are created. 406 Bytes.random(valueBytes); 407 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, valueBytes); 408 409 // Emit the key values. 410 context.write(new ImmutableBytesWritable(rk), linkKv); 411 context.write(new ImmutableBytesWritable(rk), sortKv); 412 context.write(new ImmutableBytesWritable(rk), dataKv); 413 // Move to the next row. 414 currentRow = nextRow; 415 nextRow = getNextRow(i + 1, chainLength); 416 } 417 } 418 419 /** Returns a unique row id within this chain for this index */ 420 private long getNextRow(long index, long chainLength) { 421 long nextRow = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 422 // use significant bits from the random number, but pad with index to ensure it is unique 423 // this also ensures that we do not reuse row = 0 424 // row collisions from multiple mappers are fine, since we guarantee unique chainIds 425 nextRow = nextRow - (nextRow % chainLength) + index; 426 return nextRow; 427 } 428 } 429 430 /** 431 * Writable class used as the key to group links in the linked list. Used as the key emited from a 432 * pass over the table. 433 */ 434 public static class LinkKey implements WritableComparable<LinkKey> { 435 436 private Long chainId; 437 438 public Long getOrder() { 439 return order; 440 } 441 442 public Long getChainId() { 443 return chainId; 444 } 445 446 private Long order; 447 448 public LinkKey() { 449 } 450 451 public LinkKey(long chainId, long order) { 452 this.chainId = chainId; 453 this.order = order; 454 } 455 456 @Override 457 public int compareTo(LinkKey linkKey) { 458 int res = getChainId().compareTo(linkKey.getChainId()); 459 if (res == 0) { 460 res = getOrder().compareTo(linkKey.getOrder()); 461 } 462 return res; 463 } 464 465 @Override 466 public void write(DataOutput dataOutput) throws IOException { 467 WritableUtils.writeVLong(dataOutput, chainId); 468 WritableUtils.writeVLong(dataOutput, order); 469 } 470 471 @Override 472 public void readFields(DataInput dataInput) throws IOException { 473 chainId = WritableUtils.readVLong(dataInput); 474 order = WritableUtils.readVLong(dataInput); 475 } 476 } 477 478 /** 479 * Writable used as the value emitted from a pass over the hbase table. 480 */ 481 public static class LinkChain implements WritableComparable<LinkChain> { 482 483 public Long getNext() { 484 return next; 485 } 486 487 public Long getRk() { 488 return rk; 489 } 490 491 public LinkChain() { 492 } 493 494 public LinkChain(Long rk, Long next) { 495 this.rk = rk; 496 this.next = next; 497 } 498 499 private Long rk; 500 private Long next; 501 502 @Override 503 public int compareTo(LinkChain linkChain) { 504 int res = getRk().compareTo(linkChain.getRk()); 505 if (res == 0) { 506 res = getNext().compareTo(linkChain.getNext()); 507 } 508 return res; 509 } 510 511 @Override 512 public void write(DataOutput dataOutput) throws IOException { 513 WritableUtils.writeVLong(dataOutput, rk); 514 WritableUtils.writeVLong(dataOutput, next); 515 } 516 517 @Override 518 public void readFields(DataInput dataInput) throws IOException { 519 rk = WritableUtils.readVLong(dataInput); 520 next = WritableUtils.readVLong(dataInput); 521 } 522 } 523 524 /** 525 * Class to figure out what partition to send a link in the chain to. This is based upon the 526 * linkKey's ChainId. 527 */ 528 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> { 529 @Override 530 public int getPartition(LinkKey linkKey, LinkChain linkChain, int numPartitions) { 531 int hash = linkKey.getChainId().hashCode(); 532 return Math.abs(hash % numPartitions); 533 } 534 } 535 536 /** 537 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the 538 * linkKey's ChainId. 539 */ 540 public static class NaturalKeyGroupingComparator extends WritableComparator { 541 542 protected NaturalKeyGroupingComparator() { 543 super(LinkKey.class, true); 544 } 545 546 @Override 547 public int compare(WritableComparable w1, WritableComparable w2) { 548 LinkKey k1 = (LinkKey) w1; 549 LinkKey k2 = (LinkKey) w2; 550 551 return k1.getChainId().compareTo(k2.getChainId()); 552 } 553 } 554 555 /** 556 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based 557 * upon linkKey ChainId and Order. 558 */ 559 public static class CompositeKeyComparator extends WritableComparator { 560 561 protected CompositeKeyComparator() { 562 super(LinkKey.class, true); 563 } 564 565 @Override 566 public int compare(WritableComparable w1, WritableComparable w2) { 567 LinkKey k1 = (LinkKey) w1; 568 LinkKey k2 = (LinkKey) w2; 569 570 return k1.compareTo(k2); 571 } 572 } 573 574 /** 575 * Mapper to pass over the table. For every row there could be multiple chains that landed on this 576 * row. So emit a linkKey and value for each. 577 */ 578 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> { 579 @Override 580 protected void map(ImmutableBytesWritable key, Result value, Context context) 581 throws IOException, InterruptedException { 582 long longRk = Bytes.toLong(value.getRow()); 583 584 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { 585 long chainId = Bytes.toLong(entry.getKey()); 586 long next = Bytes.toLong(entry.getValue()); 587 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); 588 long order = Bytes.toLong(CellUtil.cloneValue(c)); 589 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next)); 590 } 591 } 592 } 593 594 /** 595 * Class that does the actual checking of the links. All links in the chain should be grouped and 596 * sorted when sent to this class. Then the chain will be traversed making sure that no link is 597 * missing and that the chain is the correct length. This will throw an exception if anything is 598 * not correct. That causes the job to fail if any data is corrupt. 599 */ 600 public static class LinkedListCheckingReducer 601 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> { 602 @Override 603 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context) 604 throws java.io.IOException, java.lang.InterruptedException { 605 long next = -1L; 606 long prev = -1L; 607 long count = 0L; 608 609 for (LinkChain lc : values) { 610 611 if (next == -1) { 612 if (lc.getRk() != 0L) { 613 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + ". Chain:" 614 + key.chainId + ", order:" + key.order; 615 logError(msg, context); 616 throw new RuntimeException(msg); 617 } 618 next = lc.getNext(); 619 } else { 620 if (next != lc.getRk()) { 621 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + next 622 + " but got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order; 623 logError(msg, context); 624 throw new RuntimeException(msg); 625 } 626 prev = lc.getRk(); 627 next = lc.getNext(); 628 } 629 count++; 630 } 631 632 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 633 if (count != expectedChainLen) { 634 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " 635 + count + ". Chain:" + key.chainId + ", order:" + key.order; 636 logError(msg, context); 637 throw new RuntimeException(msg); 638 } 639 } 640 641 private static void logError(String msg, Context context) throws IOException { 642 TableName table = getTableName(context.getConfiguration()); 643 644 LOG.error("Failure in chain verification: " + msg); 645 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 646 Admin admin = connection.getAdmin()) { 647 LOG.error("cluster metrics:\n" + admin.getClusterMetrics()); 648 LOG.error("table regions:\n" + Joiner.on("\n").join(admin.getRegions(table))); 649 } 650 } 651 } 652 653 private void runCheckWithRetry() 654 throws IOException, ClassNotFoundException, InterruptedException { 655 try { 656 runCheck(); 657 } catch (Throwable t) { 658 LOG.warn("Received " + StringUtils.stringifyException(t)); 659 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); 660 runCheck(); 661 throw t; // we should still fail the test even if second retry succeeds 662 } 663 // everything green 664 } 665 666 /** 667 * After adding data to the table start a mr job to 668 */ 669 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { 670 LOG.info("Running check"); 671 Configuration conf = getConf(); 672 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime(); 673 Path p = util.getDataTestDirOnTestFS(jobName); 674 675 Job job = new Job(conf); 676 job.setJarByClass(getClass()); 677 job.setJobName(jobName); 678 679 job.setPartitionerClass(NaturalKeyPartitioner.class); 680 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 681 job.setSortComparatorClass(CompositeKeyComparator.class); 682 683 Scan scan = new Scan(); 684 scan.addFamily(CHAIN_FAM); 685 scan.addFamily(SORT_FAM); 686 scan.readVersions(1); 687 scan.setCacheBlocks(false); 688 scan.setBatch(1000); 689 690 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 691 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 692 scan.setConsistency(Consistency.TIMELINE); 693 } 694 695 TableMapReduceUtil.initTableMapperJob(getTablename().getName(), scan, 696 LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, job); 697 698 job.setReducerClass(LinkedListCheckingReducer.class); 699 job.setOutputKeyClass(NullWritable.class); 700 job.setOutputValueClass(NullWritable.class); 701 702 FileOutputFormat.setOutputPath(job, p); 703 704 assertEquals(true, job.waitForCompletion(true)); 705 706 // Delete the files. 707 util.getTestFileSystem().delete(p, true); 708 } 709 710 @Override 711 public void setUpCluster() throws Exception { 712 util = getTestingUtil(getConf()); 713 util.initializeCluster(1); 714 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 715 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 716 LOG.debug("Region Replicas enabled: " + replicaCount); 717 } 718 719 // Scale this up on a real cluster 720 if (util.isDistributedCluster()) { 721 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 722 Integer.toString(util.getAdmin().getRegionServers().size() * 10)); 723 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 724 } else { 725 util.startMiniMapReduceCluster(); 726 } 727 } 728 729 @Override 730 protected void addOptions() { 731 super.addOptions(); 732 super.addOptNoArg(OPT_CHECK, "Run check only"); 733 super.addOptNoArg(OPT_LOAD, "Run load only"); 734 } 735 736 @Override 737 protected void processOptions(CommandLine cmd) { 738 super.processOptions(cmd); 739 check = cmd.hasOption(OPT_CHECK); 740 load = cmd.hasOption(OPT_LOAD); 741 } 742 743 @Override 744 public int runTestFromCommandLine() throws Exception { 745 if (load) { 746 runLoad(); 747 } else if (check) { 748 installSlowingCoproc(); 749 runCheckWithRetry(); 750 } else { 751 testBulkLoad(); 752 } 753 return 0; 754 } 755 756 @Override 757 public TableName getTablename() { 758 return getTableName(getConf()); 759 } 760 761 public static TableName getTableName(Configuration conf) { 762 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME)); 763 } 764 765 @Override 766 protected Set<String> getColumnFamilies() { 767 return Sets.newHashSet(Bytes.toString(CHAIN_FAM), Bytes.toString(DATA_FAM), 768 Bytes.toString(SORT_FAM)); 769 } 770 771 public static void main(String[] args) throws Exception { 772 Configuration conf = HBaseConfiguration.create(); 773 IntegrationTestingUtility.setUseDistributedCluster(conf); 774 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); 775 System.exit(status); 776 } 777}