001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.IntegrationTestBase; 039import org.apache.hadoop.hbase.IntegrationTestingUtility; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionFactory; 045import org.apache.hadoop.hbase.client.Consistency; 046import org.apache.hadoop.hbase.client.RegionLocator; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.coprocessor.ObserverContext; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 055import org.apache.hadoop.hbase.coprocessor.RegionObserver; 056import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.testclassification.IntegrationTests; 059import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.RegionSplitter; 063import org.apache.hadoop.io.LongWritable; 064import org.apache.hadoop.io.NullWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.WritableComparable; 067import org.apache.hadoop.io.WritableComparator; 068import org.apache.hadoop.io.WritableUtils; 069import org.apache.hadoop.mapreduce.InputFormat; 070import org.apache.hadoop.mapreduce.InputSplit; 071import org.apache.hadoop.mapreduce.Job; 072import org.apache.hadoop.mapreduce.JobContext; 073import org.apache.hadoop.mapreduce.Mapper; 074import org.apache.hadoop.mapreduce.Partitioner; 075import org.apache.hadoop.mapreduce.RecordReader; 076import org.apache.hadoop.mapreduce.Reducer; 077import org.apache.hadoop.mapreduce.TaskAttemptContext; 078import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 079import org.apache.hadoop.util.StringUtils; 080import org.apache.hadoop.util.ToolRunner; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 087import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 089 090/** 091 * Test Bulk Load and MR on a distributed cluster. It starts an MR job that creates linked chains 092 * The format of rows is like this: Row Key -> Long L:<< Chain Id >> -> Row Key of the next link in 093 * the chain S:<< Chain Id >> -> The step in the chain that his link is. D:<< Chain Id >> -> Random 094 * Data. All chains start on row 0. All rk's are > 0. After creating the linked lists they are 095 * walked over using a TableMapper based Mapreduce Job. There are a few options exposed: 096 * hbase.IntegrationTestBulkLoad.chainLength The number of rows that will be part of each and every 097 * chain. hbase.IntegrationTestBulkLoad.numMaps The number of mappers that will be run. Each mapper 098 * creates on linked list chain. hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be 099 * run to create linked lists. hbase.IntegrationTestBulkLoad.tableName The name of the table. 100 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table 101 * under test. 102 */ 103@Category(IntegrationTests.class) 104public class IntegrationTestBulkLoad extends IntegrationTestBase { 105 106 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class); 107 108 private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); 109 private static final byte[] SORT_FAM = Bytes.toBytes("S"); 110 private static final byte[] DATA_FAM = Bytes.toBytes("D"); 111 112 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; 113 private static int CHAIN_LENGTH = 500000; 114 115 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 116 private static int NUM_MAPS = 1; 117 118 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 119 private static int NUM_IMPORT_ROUNDS = 1; 120 121 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; 122 123 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; 124 private static String TABLE_NAME = "IntegrationTestBulkLoad"; 125 126 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 127 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 128 129 private static final String OPT_LOAD = "load"; 130 private static final String OPT_CHECK = "check"; 131 132 private boolean load = false; 133 private boolean check = false; 134 135 public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver { 136 static final AtomicLong sleepTime = new AtomicLong(2000); 137 AtomicLong countOfNext = new AtomicLong(0); 138 AtomicLong countOfOpen = new AtomicLong(0); 139 140 public SlowMeCoproScanOperations() { 141 } 142 143 @Override 144 public Optional<RegionObserver> getRegionObserver() { 145 return Optional.of(this); 146 } 147 148 @Override 149 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 150 final Scan scan) throws IOException { 151 if (countOfOpen.incrementAndGet() == 2) { // slowdown openScanner randomly 152 slowdownCode(e); 153 } 154 } 155 156 @Override 157 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 158 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 159 throws IOException { 160 // this will slow down a certain next operation if the conditions are met. The slowness 161 // will allow the call to go to a replica 162 countOfNext.incrementAndGet(); 163 if (countOfNext.get() == 0 || countOfNext.get() == 4) { 164 slowdownCode(e); 165 } 166 return true; 167 } 168 169 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 170 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 171 try { 172 if (sleepTime.get() > 0) { 173 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 174 Thread.sleep(sleepTime.get()); 175 } 176 } catch (InterruptedException e1) { 177 LOG.error(e1.toString(), e1); 178 } 179 } 180 } 181 } 182 183 /** 184 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. 185 */ 186 private void installSlowingCoproc() throws IOException, InterruptedException { 187 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 188 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 189 190 TableName t = getTablename(); 191 Admin admin = util.getAdmin(); 192 TableDescriptor desc = admin.getDescriptor(t); 193 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc); 194 builder.setCoprocessor(SlowMeCoproScanOperations.class.getName()); 195 HBaseTestingUtility.modifyTableSync(admin, builder.build()); 196 } 197 198 @Test 199 public void testBulkLoad() throws Exception { 200 runLoad(); 201 installSlowingCoproc(); 202 runCheckWithRetry(); 203 } 204 205 public void runLoad() throws Exception { 206 setupTable(); 207 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 208 LOG.info("Running load with numIterations:" + numImportRounds); 209 for (int i = 0; i < numImportRounds; i++) { 210 runLinkedListMRJob(i); 211 } 212 } 213 214 private byte[][] getSplits(int numRegions) { 215 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); 216 split.setFirstRow(Bytes.toBytes(0L)); 217 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); 218 return split.split(numRegions); 219 } 220 221 private void setupTable() throws IOException, InterruptedException { 222 if (util.getAdmin().tableExists(getTablename())) { 223 util.deleteTable(getTablename()); 224 } 225 226 util.createTable(getTablename(), new byte[][] { CHAIN_FAM, SORT_FAM, DATA_FAM }, getSplits(16)); 227 228 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 229 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 230 231 TableName t = getTablename(); 232 HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); 233 } 234 235 private void runLinkedListMRJob(int iteration) throws Exception { 236 String jobName = 237 IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime(); 238 Configuration conf = new Configuration(util.getConfiguration()); 239 Path p = null; 240 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { 241 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); 242 } else { 243 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY)); 244 } 245 246 conf.setBoolean("mapreduce.map.speculative", false); 247 conf.setBoolean("mapreduce.reduce.speculative", false); 248 conf.setInt(ROUND_NUM_KEY, iteration); 249 250 Job job = new Job(conf); 251 252 job.setJobName(jobName); 253 254 // set the input format so that we can create map tasks with no data input. 255 job.setInputFormatClass(ITBulkLoadInputFormat.class); 256 257 // Set the mapper classes. 258 job.setMapperClass(LinkedListCreationMapper.class); 259 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 260 job.setMapOutputValueClass(KeyValue.class); 261 262 // Use the identity reducer 263 // So nothing to do here. 264 265 // Set this jar. 266 job.setJarByClass(getClass()); 267 268 // Set where to place the hfiles. 269 FileOutputFormat.setOutputPath(job, p); 270 try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); 271 Table table = conn.getTable(getTablename()); 272 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { 273 274 // Configure the partitioner and other things needed for HFileOutputFormat. 275 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 276 277 // Run the job making sure it works. 278 assertEquals(true, job.waitForCompletion(true)); 279 280 // Create a new loader. 281 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 282 283 // Load the HFiles in. 284 loader.doBulkLoad(p, admin, table, regionLocator); 285 } 286 287 // Delete the files. 288 util.getTestFileSystem().delete(p, true); 289 } 290 291 public static class EmptySplit extends InputSplit implements Writable { 292 @Override 293 public void write(DataOutput out) throws IOException { 294 } 295 296 @Override 297 public void readFields(DataInput in) throws IOException { 298 } 299 300 @Override 301 public long getLength() { 302 return 0L; 303 } 304 305 @Override 306 public String[] getLocations() { 307 return new String[0]; 308 } 309 } 310 311 public static class FixedRecordReader<K, V> extends RecordReader<K, V> { 312 private int index = -1; 313 private K[] keys; 314 private V[] values; 315 316 public FixedRecordReader(K[] keys, V[] values) { 317 this.keys = keys; 318 this.values = values; 319 } 320 321 @Override 322 public void initialize(InputSplit split, TaskAttemptContext context) 323 throws IOException, InterruptedException { 324 } 325 326 @Override 327 public boolean nextKeyValue() throws IOException, InterruptedException { 328 return ++index < keys.length; 329 } 330 331 @Override 332 public K getCurrentKey() throws IOException, InterruptedException { 333 return keys[index]; 334 } 335 336 @Override 337 public V getCurrentValue() throws IOException, InterruptedException { 338 return values[index]; 339 } 340 341 @Override 342 public float getProgress() throws IOException, InterruptedException { 343 return (float) index / keys.length; 344 } 345 346 @Override 347 public void close() throws IOException { 348 } 349 } 350 351 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> { 352 @Override 353 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 354 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 355 ArrayList<InputSplit> ret = new ArrayList<>(numSplits); 356 for (int i = 0; i < numSplits; ++i) { 357 ret.add(new EmptySplit()); 358 } 359 return ret; 360 } 361 362 @Override 363 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, 364 TaskAttemptContext context) throws IOException, InterruptedException { 365 int taskId = context.getTaskAttemptID().getTaskID().getId(); 366 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 367 int numIterations = 368 context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 369 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); 370 371 taskId = taskId + iteration * numMapTasks; 372 numMapTasks = numMapTasks * numIterations; 373 374 long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 375 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per 376 // task and across iterations 377 LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) }; 378 379 return new FixedRecordReader<>(keys, keys); 380 } 381 } 382 383 /** 384 * Mapper that creates a linked list of KeyValues. Each map task generates one linked list. All 385 * lists start on row key 0L. All lists should be CHAIN_LENGTH long. 386 */ 387 public static class LinkedListCreationMapper 388 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> { 389 390 @Override 391 protected void map(LongWritable key, LongWritable value, Context context) 392 throws IOException, InterruptedException { 393 long chainId = value.get(); 394 LOG.info("Starting mapper with chainId:" + chainId); 395 396 byte[] chainIdArray = Bytes.toBytes(chainId); 397 long currentRow = 0; 398 399 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 400 long nextRow = getNextRow(0, chainLength); 401 byte[] valueBytes = new byte[50]; 402 403 for (long i = 0; i < chainLength; i++) { 404 byte[] rk = Bytes.toBytes(currentRow); 405 406 // Next link in the chain. 407 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); 408 // What link in the chain this is. 409 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); 410 // Added data so that large stores are created. 411 Bytes.random(valueBytes); 412 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, valueBytes); 413 414 // Emit the key values. 415 context.write(new ImmutableBytesWritable(rk), linkKv); 416 context.write(new ImmutableBytesWritable(rk), sortKv); 417 context.write(new ImmutableBytesWritable(rk), dataKv); 418 // Move to the next row. 419 currentRow = nextRow; 420 nextRow = getNextRow(i + 1, chainLength); 421 } 422 } 423 424 /** Returns a unique row id within this chain for this index */ 425 private long getNextRow(long index, long chainLength) { 426 long nextRow = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 427 // use significant bits from the random number, but pad with index to ensure it is unique 428 // this also ensures that we do not reuse row = 0 429 // row collisions from multiple mappers are fine, since we guarantee unique chainIds 430 nextRow = nextRow - (nextRow % chainLength) + index; 431 return nextRow; 432 } 433 } 434 435 /** 436 * Writable class used as the key to group links in the linked list. Used as the key emited from a 437 * pass over the table. 438 */ 439 public static class LinkKey implements WritableComparable<LinkKey> { 440 441 private Long chainId; 442 443 public Long getOrder() { 444 return order; 445 } 446 447 public Long getChainId() { 448 return chainId; 449 } 450 451 private Long order; 452 453 public LinkKey() { 454 } 455 456 public LinkKey(long chainId, long order) { 457 this.chainId = chainId; 458 this.order = order; 459 } 460 461 @Override 462 public int compareTo(LinkKey linkKey) { 463 int res = getChainId().compareTo(linkKey.getChainId()); 464 if (res == 0) { 465 res = getOrder().compareTo(linkKey.getOrder()); 466 } 467 return res; 468 } 469 470 @Override 471 public void write(DataOutput dataOutput) throws IOException { 472 WritableUtils.writeVLong(dataOutput, chainId); 473 WritableUtils.writeVLong(dataOutput, order); 474 } 475 476 @Override 477 public void readFields(DataInput dataInput) throws IOException { 478 chainId = WritableUtils.readVLong(dataInput); 479 order = WritableUtils.readVLong(dataInput); 480 } 481 } 482 483 /** 484 * Writable used as the value emitted from a pass over the hbase table. 485 */ 486 public static class LinkChain implements WritableComparable<LinkChain> { 487 488 public Long getNext() { 489 return next; 490 } 491 492 public Long getRk() { 493 return rk; 494 } 495 496 public LinkChain() { 497 } 498 499 public LinkChain(Long rk, Long next) { 500 this.rk = rk; 501 this.next = next; 502 } 503 504 private Long rk; 505 private Long next; 506 507 @Override 508 public int compareTo(LinkChain linkChain) { 509 int res = getRk().compareTo(linkChain.getRk()); 510 if (res == 0) { 511 res = getNext().compareTo(linkChain.getNext()); 512 } 513 return res; 514 } 515 516 @Override 517 public void write(DataOutput dataOutput) throws IOException { 518 WritableUtils.writeVLong(dataOutput, rk); 519 WritableUtils.writeVLong(dataOutput, next); 520 } 521 522 @Override 523 public void readFields(DataInput dataInput) throws IOException { 524 rk = WritableUtils.readVLong(dataInput); 525 next = WritableUtils.readVLong(dataInput); 526 } 527 } 528 529 /** 530 * Class to figure out what partition to send a link in the chain to. This is based upon the 531 * linkKey's ChainId. 532 */ 533 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> { 534 @Override 535 public int getPartition(LinkKey linkKey, LinkChain linkChain, int numPartitions) { 536 int hash = linkKey.getChainId().hashCode(); 537 return Math.abs(hash % numPartitions); 538 } 539 } 540 541 /** 542 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the 543 * linkKey's ChainId. 544 */ 545 public static class NaturalKeyGroupingComparator extends WritableComparator { 546 547 protected NaturalKeyGroupingComparator() { 548 super(LinkKey.class, true); 549 } 550 551 @Override 552 public int compare(WritableComparable w1, WritableComparable w2) { 553 LinkKey k1 = (LinkKey) w1; 554 LinkKey k2 = (LinkKey) w2; 555 556 return k1.getChainId().compareTo(k2.getChainId()); 557 } 558 } 559 560 /** 561 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based 562 * upon linkKey ChainId and Order. 563 */ 564 public static class CompositeKeyComparator extends WritableComparator { 565 566 protected CompositeKeyComparator() { 567 super(LinkKey.class, true); 568 } 569 570 @Override 571 public int compare(WritableComparable w1, WritableComparable w2) { 572 LinkKey k1 = (LinkKey) w1; 573 LinkKey k2 = (LinkKey) w2; 574 575 return k1.compareTo(k2); 576 } 577 } 578 579 /** 580 * Mapper to pass over the table. For every row there could be multiple chains that landed on this 581 * row. So emit a linkKey and value for each. 582 */ 583 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> { 584 @Override 585 protected void map(ImmutableBytesWritable key, Result value, Context context) 586 throws IOException, InterruptedException { 587 long longRk = Bytes.toLong(value.getRow()); 588 589 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { 590 long chainId = Bytes.toLong(entry.getKey()); 591 long next = Bytes.toLong(entry.getValue()); 592 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); 593 long order = Bytes.toLong(CellUtil.cloneValue(c)); 594 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next)); 595 } 596 } 597 } 598 599 /** 600 * Class that does the actual checking of the links. All links in the chain should be grouped and 601 * sorted when sent to this class. Then the chain will be traversed making sure that no link is 602 * missing and that the chain is the correct length. This will throw an exception if anything is 603 * not correct. That causes the job to fail if any data is corrupt. 604 */ 605 public static class LinkedListCheckingReducer 606 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> { 607 @Override 608 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context) 609 throws java.io.IOException, java.lang.InterruptedException { 610 long next = -1L; 611 long prev = -1L; 612 long count = 0L; 613 614 for (LinkChain lc : values) { 615 616 if (next == -1) { 617 if (lc.getRk() != 0L) { 618 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + ". Chain:" 619 + key.chainId + ", order:" + key.order; 620 logError(msg, context); 621 throw new RuntimeException(msg); 622 } 623 next = lc.getNext(); 624 } else { 625 if (next != lc.getRk()) { 626 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + next 627 + " but got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order; 628 logError(msg, context); 629 throw new RuntimeException(msg); 630 } 631 prev = lc.getRk(); 632 next = lc.getNext(); 633 } 634 count++; 635 } 636 637 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 638 if (count != expectedChainLen) { 639 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " 640 + count + ". Chain:" + key.chainId + ", order:" + key.order; 641 logError(msg, context); 642 throw new RuntimeException(msg); 643 } 644 } 645 646 private static void logError(String msg, Context context) throws IOException { 647 TableName table = getTableName(context.getConfiguration()); 648 649 LOG.error("Failure in chain verification: " + msg); 650 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 651 Admin admin = connection.getAdmin()) { 652 LOG.error("cluster status:\n" + admin.getClusterStatus()); 653 LOG.error("table regions:\n" + Joiner.on("\n").join(admin.getTableRegions(table))); 654 } 655 } 656 } 657 658 private void runCheckWithRetry() 659 throws IOException, ClassNotFoundException, InterruptedException { 660 try { 661 runCheck(); 662 } catch (Throwable t) { 663 LOG.warn("Received " + StringUtils.stringifyException(t)); 664 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); 665 runCheck(); 666 throw t; // we should still fail the test even if second retry succeeds 667 } 668 // everything green 669 } 670 671 /** 672 * After adding data to the table start a mr job to nnn 673 */ 674 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { 675 LOG.info("Running check"); 676 Configuration conf = getConf(); 677 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime(); 678 Path p = util.getDataTestDirOnTestFS(jobName); 679 680 Job job = new Job(conf); 681 job.setJarByClass(getClass()); 682 job.setJobName(jobName); 683 684 job.setPartitionerClass(NaturalKeyPartitioner.class); 685 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 686 job.setSortComparatorClass(CompositeKeyComparator.class); 687 688 Scan scan = new Scan(); 689 scan.addFamily(CHAIN_FAM); 690 scan.addFamily(SORT_FAM); 691 scan.setMaxVersions(1); 692 scan.setCacheBlocks(false); 693 scan.setBatch(1000); 694 695 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 696 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 697 scan.setConsistency(Consistency.TIMELINE); 698 } 699 700 TableMapReduceUtil.initTableMapperJob(getTablename().getName(), scan, 701 LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, job); 702 703 job.setReducerClass(LinkedListCheckingReducer.class); 704 job.setOutputKeyClass(NullWritable.class); 705 job.setOutputValueClass(NullWritable.class); 706 707 FileOutputFormat.setOutputPath(job, p); 708 709 assertEquals(true, job.waitForCompletion(true)); 710 711 // Delete the files. 712 util.getTestFileSystem().delete(p, true); 713 } 714 715 @Override 716 public void setUpCluster() throws Exception { 717 util = getTestingUtil(getConf()); 718 util.initializeCluster(1); 719 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 720 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 721 LOG.debug("Region Replicas enabled: " + replicaCount); 722 } 723 724 // Scale this up on a real cluster 725 if (util.isDistributedCluster()) { 726 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 727 Integer.toString(util.getAdmin().getRegionServers().size() * 10)); 728 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 729 } else { 730 util.startMiniMapReduceCluster(); 731 } 732 } 733 734 @Override 735 protected void addOptions() { 736 super.addOptions(); 737 super.addOptNoArg(OPT_CHECK, "Run check only"); 738 super.addOptNoArg(OPT_LOAD, "Run load only"); 739 } 740 741 @Override 742 protected void processOptions(CommandLine cmd) { 743 super.processOptions(cmd); 744 check = cmd.hasOption(OPT_CHECK); 745 load = cmd.hasOption(OPT_LOAD); 746 } 747 748 @Override 749 public int runTestFromCommandLine() throws Exception { 750 if (load) { 751 runLoad(); 752 } else if (check) { 753 installSlowingCoproc(); 754 runCheckWithRetry(); 755 } else { 756 testBulkLoad(); 757 } 758 return 0; 759 } 760 761 @Override 762 public TableName getTablename() { 763 return getTableName(getConf()); 764 } 765 766 public static TableName getTableName(Configuration conf) { 767 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME)); 768 } 769 770 @Override 771 protected Set<String> getColumnFamilies() { 772 return Sets.newHashSet(Bytes.toString(CHAIN_FAM), Bytes.toString(DATA_FAM), 773 Bytes.toString(SORT_FAM)); 774 } 775 776 public static void main(String[] args) throws Exception { 777 Configuration conf = HBaseConfiguration.create(); 778 IntegrationTestingUtility.setUseDistributedCluster(conf); 779 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); 780 System.exit(status); 781 } 782}