001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CoprocessorEnvironment; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.codec.KeyValueCodec; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.MasterObserver; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.master.RegionPlan; 051import org.apache.hadoop.hbase.testclassification.FlakeyTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.AfterClass; 057import org.junit.Assert; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@Category({ MediumTests.class, FlakeyTests.class }) 067public class TestMultiParallel { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMultiParallel.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); 074 075 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 076 private static final byte[] VALUE = Bytes.toBytes("value"); 077 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 078 private static final String FAMILY = "family"; 079 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 080 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 081 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 082 private static final byte[][] KEYS = makeKeys(); 083 084 private static final int slaves = 5; // also used for testing HTable pool size 085 private static Connection CONNECTION; 086 087 @BeforeClass 088 public static void beforeClass() throws Exception { 089 // Uncomment the following lines if more verbosity is needed for 090 // debugging (see HBASE-12285 for details). 091 // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 092 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 093 // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 094 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 095 KeyValueCodec.class.getCanonicalName()); 096 // Disable table on master for now as the feature is broken 097 // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 098 // We used to ask for system tables on Master exclusively but not needed by test and doesn't 099 // work anyways -- so commented out. 100 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 101 UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 102 MyMasterObserver.class.getName()); 103 UTIL.startMiniCluster(slaves); 104 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 105 UTIL.waitTableEnabled(TEST_TABLE); 106 t.close(); 107 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 108 assertTrue(MyMasterObserver.start.get()); 109 } 110 111 @AfterClass 112 public static void afterClass() throws Exception { 113 CONNECTION.close(); 114 UTIL.shutdownMiniCluster(); 115 } 116 117 @Before 118 public void before() throws Exception { 119 final int balanceCount = MyMasterObserver.postBalanceCount.get(); 120 LOG.info("before"); 121 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 122 // Distribute regions 123 UTIL.getMiniHBaseCluster().getMaster().balance(); 124 // Some plans are created. 125 if (MyMasterObserver.postBalanceCount.get() > balanceCount) { 126 // It is necessary to wait the move procedure to start. 127 // Otherwise, the next wait may pass immediately. 128 UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster() 129 .getAssignmentManager().hasRegionsInTransition()); 130 } 131 132 // Wait until completing balance 133 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); 134 } 135 LOG.info("before done"); 136 } 137 138 private static byte[][] makeKeys() { 139 byte[][] starterKeys = HBaseTestingUtility.KEYS; 140 // Create a "non-uniform" test set with the following characteristics: 141 // a) Unequal number of keys per region 142 143 // Don't use integer as a multiple, so that we have a number of keys that is 144 // not a multiple of the number of regions 145 int numKeys = (int) (starterKeys.length * 10.33F); 146 147 List<byte[]> keys = new ArrayList<>(); 148 for (int i = 0; i < numKeys; i++) { 149 int kIdx = i % starterKeys.length; 150 byte[] k = starterKeys[kIdx]; 151 byte[] cp = new byte[k.length + 1]; 152 System.arraycopy(k, 0, cp, 0, k.length); 153 cp[k.length] = new Integer(i % 256).byteValue(); 154 keys.add(cp); 155 } 156 157 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 158 // should work) 159 // c) keys are not in sorted order (within a region), to ensure that the 160 // sorting code and index mapping doesn't break the functionality 161 for (int i = 0; i < 100; i++) { 162 int kIdx = i % starterKeys.length; 163 byte[] k = starterKeys[kIdx]; 164 byte[] cp = new byte[k.length + 1]; 165 System.arraycopy(k, 0, cp, 0, k.length); 166 cp[k.length] = new Integer(i % 256).byteValue(); 167 keys.add(cp); 168 } 169 return keys.toArray(new byte[][] { new byte[] {} }); 170 } 171 172 /** 173 * This is for testing the active number of threads that were used while doing a batch operation. 174 * It inserts one row per region via the batch operation, and then checks the number of active 175 * threads. 176 * <p/> 177 * For HBASE-3553 178 */ 179 @Test 180 public void testActiveThreadsCount() throws Exception { 181 UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1); 182 // Make sure max is at least as big as coresize; can be smaller in test context where 183 // we tune down thread sizes -- max could be < slaves + 1. 184 UTIL.getConfiguration().setLong("hbase.htable.threads.max", slaves + 1); 185 try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { 186 ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); 187 try { 188 try (Table t = connection.getTable(TEST_TABLE, executor)) { 189 List<Put> puts = constructPutRequests(); // creates a Put for every region 190 t.batch(puts, null); 191 HashSet<ServerName> regionservers = new HashSet<>(); 192 try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { 193 for (Row r : puts) { 194 HRegionLocation location = locator.getRegionLocation(r.getRow()); 195 regionservers.add(location.getServerName()); 196 } 197 } 198 assertEquals(regionservers.size(), executor.getLargestPoolSize()); 199 } 200 } finally { 201 executor.shutdownNow(); 202 } 203 } 204 } 205 206 @Test 207 public void testBatchWithGet() throws Exception { 208 LOG.info("test=testBatchWithGet"); 209 Table table = UTIL.getConnection().getTable(TEST_TABLE); 210 211 // load test data 212 List<Put> puts = constructPutRequests(); 213 table.batch(puts, null); 214 215 // create a list of gets and run it 216 List<Row> gets = new ArrayList<>(); 217 for (byte[] k : KEYS) { 218 Get get = new Get(k); 219 get.addColumn(BYTES_FAMILY, QUALIFIER); 220 gets.add(get); 221 } 222 Result[] multiRes = new Result[gets.size()]; 223 table.batch(gets, multiRes); 224 225 // Same gets using individual call API 226 List<Result> singleRes = new ArrayList<>(); 227 for (Row get : gets) { 228 singleRes.add(table.get((Get) get)); 229 } 230 // Compare results 231 Assert.assertEquals(singleRes.size(), multiRes.length); 232 for (int i = 0; i < singleRes.size(); i++) { 233 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 234 Cell[] singleKvs = singleRes.get(i).rawCells(); 235 Cell[] multiKvs = multiRes[i].rawCells(); 236 for (int j = 0; j < singleKvs.length; j++) { 237 Assert.assertEquals(singleKvs[j], multiKvs[j]); 238 Assert.assertEquals(0, 239 Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j]))); 240 } 241 } 242 table.close(); 243 } 244 245 @Test 246 public void testBadFam() throws Exception { 247 LOG.info("test=testBadFam"); 248 Table table = UTIL.getConnection().getTable(TEST_TABLE); 249 250 List<Row> actions = new ArrayList<>(); 251 Put p = new Put(Bytes.toBytes("row1")); 252 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 253 actions.add(p); 254 p = new Put(Bytes.toBytes("row2")); 255 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 256 actions.add(p); 257 258 // row1 and row2 should be in the same region. 259 260 Object[] r = new Object[actions.size()]; 261 try { 262 table.batch(actions, r); 263 fail(); 264 } catch (RetriesExhaustedWithDetailsException ex) { 265 LOG.debug(ex.toString(), ex); 266 // good! 267 assertFalse(ex.mayHaveClusterIssues()); 268 } 269 assertEquals(2, r.length); 270 assertTrue(r[0] instanceof Throwable); 271 assertTrue(r[1] instanceof Result); 272 table.close(); 273 } 274 275 @Test 276 public void testFlushCommitsNoAbort() throws Exception { 277 LOG.info("test=testFlushCommitsNoAbort"); 278 doTestFlushCommits(false); 279 } 280 281 /** 282 * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take 283 * an unnecessarily long time to run. 284 */ 285 @Test 286 public void testFlushCommitsWithAbort() throws Exception { 287 LOG.info("test=testFlushCommitsWithAbort"); 288 doTestFlushCommits(true); 289 } 290 291 /** 292 * Set table auto flush to false and test flushing commits 293 * @param doAbort true if abort one regionserver in the testing 294 */ 295 private void doTestFlushCommits(boolean doAbort) throws Exception { 296 // Load the data 297 LOG.info("get new table"); 298 Table table = UTIL.getConnection().getTable(TEST_TABLE); 299 300 LOG.info("constructPutRequests"); 301 List<Put> puts = constructPutRequests(); 302 table.put(puts); 303 LOG.info("puts"); 304 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 305 assert liveRScount > 0; 306 JVMClusterUtil.RegionServerThread liveRS = 307 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 308 if (doAbort) { 309 liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits")); 310 // If we wait for no regions being online after we abort the server, we 311 // could ensure the master has re-assigned the regions on killed server 312 // after writing successfully. It means the server we aborted is dead 313 // and detected by matser 314 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 315 Thread.sleep(100); 316 } 317 // try putting more keys after the abort. same key/qual... just validating 318 // no exceptions thrown 319 puts = constructPutRequests(); 320 table.put(puts); 321 } 322 323 LOG.info("validating loaded data"); 324 validateLoadedData(table); 325 326 // Validate server and region count 327 List<JVMClusterUtil.RegionServerThread> liveRSs = 328 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 329 int count = 0; 330 for (JVMClusterUtil.RegionServerThread t : liveRSs) { 331 count++; 332 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 333 } 334 LOG.info("Count=" + count); 335 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, 336 (doAbort ? (liveRScount - 1) : liveRScount), count); 337 if (doAbort) { 338 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 339 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 340 @Override 341 public boolean evaluate() throws Exception { 342 // We disable regions on master so the count should be liveRScount - 1 343 return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics() 344 .size() == liveRScount - 1; 345 } 346 }); 347 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 348 } 349 350 table.close(); 351 LOG.info("done"); 352 } 353 354 @Test 355 public void testBatchWithPut() throws Exception { 356 LOG.info("test=testBatchWithPut"); 357 Table table = CONNECTION.getTable(TEST_TABLE); 358 // put multiple rows using a batch 359 List<Put> puts = constructPutRequests(); 360 361 Object[] results = new Object[puts.size()]; 362 table.batch(puts, results); 363 validateSizeAndEmpty(results, KEYS.length); 364 365 if (true) { 366 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 367 assert liveRScount > 0; 368 JVMClusterUtil.RegionServerThread liveRS = 369 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 370 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 371 puts = constructPutRequests(); 372 try { 373 results = new Object[puts.size()]; 374 table.batch(puts, results); 375 } catch (RetriesExhaustedWithDetailsException ree) { 376 LOG.info(ree.getExhaustiveDescription()); 377 table.close(); 378 throw ree; 379 } 380 validateSizeAndEmpty(results, KEYS.length); 381 } 382 383 validateLoadedData(table); 384 table.close(); 385 } 386 387 @Test 388 public void testBatchWithDelete() throws Exception { 389 LOG.info("test=testBatchWithDelete"); 390 Table table = UTIL.getConnection().getTable(TEST_TABLE); 391 392 // Load some data 393 List<Put> puts = constructPutRequests(); 394 Object[] results = new Object[puts.size()]; 395 table.batch(puts, results); 396 validateSizeAndEmpty(results, KEYS.length); 397 398 // Deletes 399 List<Row> deletes = new ArrayList<>(); 400 for (int i = 0; i < KEYS.length; i++) { 401 Delete delete = new Delete(KEYS[i]); 402 delete.addFamily(BYTES_FAMILY); 403 deletes.add(delete); 404 } 405 results = new Object[deletes.size()]; 406 table.batch(deletes, results); 407 validateSizeAndEmpty(results, KEYS.length); 408 409 // Get to make sure ... 410 for (byte[] k : KEYS) { 411 Get get = new Get(k); 412 get.addColumn(BYTES_FAMILY, QUALIFIER); 413 Assert.assertFalse(table.exists(get)); 414 } 415 table.close(); 416 } 417 418 @Test 419 public void testHTableDeleteWithList() throws Exception { 420 LOG.info("test=testHTableDeleteWithList"); 421 Table table = UTIL.getConnection().getTable(TEST_TABLE); 422 423 // Load some data 424 List<Put> puts = constructPutRequests(); 425 Object[] results = new Object[puts.size()]; 426 table.batch(puts, results); 427 validateSizeAndEmpty(results, KEYS.length); 428 429 // Deletes 430 ArrayList<Delete> deletes = new ArrayList<>(); 431 for (int i = 0; i < KEYS.length; i++) { 432 Delete delete = new Delete(KEYS[i]); 433 delete.addFamily(BYTES_FAMILY); 434 deletes.add(delete); 435 } 436 table.delete(deletes); 437 Assert.assertTrue(deletes.isEmpty()); 438 439 // Get to make sure ... 440 for (byte[] k : KEYS) { 441 Get get = new Get(k); 442 get.addColumn(BYTES_FAMILY, QUALIFIER); 443 Assert.assertFalse(table.exists(get)); 444 } 445 table.close(); 446 } 447 448 @Test 449 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 450 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 451 Table table = UTIL.getConnection().getTable(TEST_TABLE); 452 453 List<Row> puts = new ArrayList<>(); 454 for (int i = 0; i < 100; i++) { 455 Put put = new Put(ONE_ROW); 456 byte[] qual = Bytes.toBytes("column" + i); 457 put.addColumn(BYTES_FAMILY, qual, VALUE); 458 puts.add(put); 459 } 460 Object[] results = new Object[puts.size()]; 461 table.batch(puts, results); 462 463 // validate 464 validateSizeAndEmpty(results, 100); 465 466 // get the data back and validate that it is correct 467 List<Row> gets = new ArrayList<>(); 468 for (int i = 0; i < 100; i++) { 469 Get get = new Get(ONE_ROW); 470 byte[] qual = Bytes.toBytes("column" + i); 471 get.addColumn(BYTES_FAMILY, qual); 472 gets.add(get); 473 } 474 475 Object[] multiRes = new Object[gets.size()]; 476 table.batch(gets, multiRes); 477 478 int idx = 0; 479 for (Object r : multiRes) { 480 byte[] qual = Bytes.toBytes("column" + idx); 481 validateResult(r, qual, VALUE); 482 idx++; 483 } 484 table.close(); 485 } 486 487 @Test 488 public void testBatchWithIncrementAndAppend() throws Exception { 489 LOG.info("test=testBatchWithIncrementAndAppend"); 490 final byte[] QUAL1 = Bytes.toBytes("qual1"); 491 final byte[] QUAL2 = Bytes.toBytes("qual2"); 492 final byte[] QUAL3 = Bytes.toBytes("qual3"); 493 final byte[] QUAL4 = Bytes.toBytes("qual4"); 494 Table table = UTIL.getConnection().getTable(TEST_TABLE); 495 Delete d = new Delete(ONE_ROW); 496 table.delete(d); 497 Put put = new Put(ONE_ROW); 498 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 499 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 500 table.put(put); 501 502 Increment inc = new Increment(ONE_ROW); 503 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 504 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 505 506 Append a = new Append(ONE_ROW); 507 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 508 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 509 List<Row> actions = new ArrayList<>(); 510 actions.add(inc); 511 actions.add(a); 512 513 Object[] multiRes = new Object[actions.size()]; 514 table.batch(actions, multiRes); 515 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 516 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 517 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 518 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 519 table.close(); 520 } 521 522 @Test 523 public void testNonceCollision() throws Exception { 524 LOG.info("test=testNonceCollision"); 525 final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); 526 Table table = connection.getTable(TEST_TABLE); 527 Put put = new Put(ONE_ROW); 528 put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); 529 530 // Replace nonce manager with the one that returns each nonce twice. 531 NonceGenerator cnm = new NonceGenerator() { 532 533 private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get(); 534 535 private long lastNonce = -1; 536 537 @Override 538 public synchronized long newNonce() { 539 long nonce = 0; 540 if (lastNonce == -1) { 541 lastNonce = nonce = delegate.newNonce(); 542 } else { 543 nonce = lastNonce; 544 lastNonce = -1L; 545 } 546 return nonce; 547 } 548 549 @Override 550 public long getNonceGroup() { 551 return delegate.getNonceGroup(); 552 } 553 }; 554 555 NonceGenerator oldCnm = 556 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) connection, cnm); 557 558 // First test sequential requests. 559 try { 560 Increment inc = new Increment(ONE_ROW); 561 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 562 table.increment(inc); 563 564 // duplicate increment 565 inc = new Increment(ONE_ROW); 566 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 567 Result result = table.increment(inc); 568 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 569 570 Get get = new Get(ONE_ROW); 571 get.addColumn(BYTES_FAMILY, QUALIFIER); 572 result = table.get(get); 573 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 574 575 // Now run a bunch of requests in parallel, exactly half should succeed. 576 int numRequests = 40; 577 final CountDownLatch startedLatch = new CountDownLatch(numRequests); 578 final CountDownLatch startLatch = new CountDownLatch(1); 579 final CountDownLatch doneLatch = new CountDownLatch(numRequests); 580 for (int i = 0; i < numRequests; ++i) { 581 Runnable r = new Runnable() { 582 @Override 583 public void run() { 584 Table table = null; 585 try { 586 table = connection.getTable(TEST_TABLE); 587 } catch (IOException e) { 588 fail("Not expected"); 589 } 590 Increment inc = new Increment(ONE_ROW); 591 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 592 startedLatch.countDown(); 593 try { 594 startLatch.await(); 595 } catch (InterruptedException e) { 596 fail("Not expected"); 597 } 598 try { 599 table.increment(inc); 600 } catch (IOException ioEx) { 601 fail("Not expected"); 602 } 603 doneLatch.countDown(); 604 } 605 }; 606 Threads.setDaemonThreadRunning(new Thread(r)); 607 } 608 startedLatch.await(); // Wait until all threads are ready... 609 startLatch.countDown(); // ...and unleash the herd! 610 doneLatch.await(); 611 // Now verify 612 get = new Get(ONE_ROW); 613 get.addColumn(BYTES_FAMILY, QUALIFIER); 614 result = table.get(get); 615 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); 616 table.close(); 617 } finally { 618 ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, 619 oldCnm); 620 } 621 } 622 623 @Test 624 public void testBatchWithMixedActions() throws Exception { 625 LOG.info("test=testBatchWithMixedActions"); 626 Table table = UTIL.getConnection().getTable(TEST_TABLE); 627 628 // Load some data to start 629 List<Put> puts = constructPutRequests(); 630 Object[] results = new Object[puts.size()]; 631 table.batch(puts, results); 632 validateSizeAndEmpty(results, KEYS.length); 633 634 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 635 // put 636 List<Row> actions = new ArrayList<>(); 637 638 byte[] qual2 = Bytes.toBytes("qual2"); 639 byte[] val2 = Bytes.toBytes("putvalue2"); 640 641 // 0 get 642 Get get = new Get(KEYS[10]); 643 get.addColumn(BYTES_FAMILY, QUALIFIER); 644 actions.add(get); 645 646 // 1 get 647 get = new Get(KEYS[11]); 648 get.addColumn(BYTES_FAMILY, QUALIFIER); 649 actions.add(get); 650 651 // 2 put of new column 652 Put put = new Put(KEYS[10]); 653 put.addColumn(BYTES_FAMILY, qual2, val2); 654 actions.add(put); 655 656 // 3 delete 657 Delete delete = new Delete(KEYS[20]); 658 delete.addFamily(BYTES_FAMILY); 659 actions.add(delete); 660 661 // 4 get 662 get = new Get(KEYS[30]); 663 get.addColumn(BYTES_FAMILY, QUALIFIER); 664 actions.add(get); 665 666 // There used to be a 'get' of a previous put here, but removed 667 // since this API really cannot guarantee order in terms of mixed 668 // get/puts. 669 670 // 5 put of new column 671 put = new Put(KEYS[40]); 672 put.addColumn(BYTES_FAMILY, qual2, val2); 673 actions.add(put); 674 675 // 6 RowMutations 676 RowMutations rm = new RowMutations(KEYS[50]); 677 put = new Put(KEYS[50]); 678 put.addColumn(BYTES_FAMILY, qual2, val2); 679 rm.add((Mutation) put); 680 byte[] qual3 = Bytes.toBytes("qual3"); 681 byte[] val3 = Bytes.toBytes("putvalue3"); 682 put = new Put(KEYS[50]); 683 put.addColumn(BYTES_FAMILY, qual3, val3); 684 rm.add((Mutation) put); 685 actions.add(rm); 686 687 // 7 Add another Get to the mixed sequence after RowMutations 688 get = new Get(KEYS[10]); 689 get.addColumn(BYTES_FAMILY, QUALIFIER); 690 actions.add(get); 691 692 results = new Object[actions.size()]; 693 table.batch(actions, results); 694 695 // Validation 696 697 validateResult(results[0]); 698 validateResult(results[1]); 699 validateEmpty(results[3]); 700 validateResult(results[4]); 701 validateEmpty(results[5]); 702 validateEmpty(results[6]); 703 validateResult(results[7]); 704 705 // validate last put, externally from the batch 706 get = new Get(KEYS[40]); 707 get.addColumn(BYTES_FAMILY, qual2); 708 Result r = table.get(get); 709 validateResult(r, qual2, val2); 710 711 // validate last RowMutations, externally from the batch 712 get = new Get(KEYS[50]); 713 get.addColumn(BYTES_FAMILY, qual2); 714 r = table.get(get); 715 validateResult(r, qual2, val2); 716 717 get = new Get(KEYS[50]); 718 get.addColumn(BYTES_FAMILY, qual3); 719 r = table.get(get); 720 validateResult(r, qual3, val3); 721 722 table.close(); 723 } 724 725 // // Helper methods //// 726 727 private void validateResult(Object r) { 728 validateResult(r, QUALIFIER, VALUE); 729 } 730 731 private void validateResult(Object r1, byte[] qual, byte[] val) { 732 Result r = (Result) r1; 733 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 734 byte[] value = r.getValue(BYTES_FAMILY, qual); 735 if (0 != Bytes.compareTo(val, value)) { 736 fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value) 737 + "]"); 738 } 739 } 740 741 private List<Put> constructPutRequests() { 742 List<Put> puts = new ArrayList<>(); 743 for (byte[] k : KEYS) { 744 Put put = new Put(k); 745 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE); 746 puts.add(put); 747 } 748 return puts; 749 } 750 751 private void validateLoadedData(Table table) throws IOException { 752 // get the data back and validate that it is correct 753 LOG.info("Validating data on " + table); 754 List<Get> gets = new ArrayList<>(); 755 for (byte[] k : KEYS) { 756 Get get = new Get(k); 757 get.addColumn(BYTES_FAMILY, QUALIFIER); 758 gets.add(get); 759 } 760 int retryNum = 10; 761 Result[] results = null; 762 do { 763 results = table.get(gets); 764 boolean finished = true; 765 for (Result result : results) { 766 if (result.isEmpty()) { 767 finished = false; 768 break; 769 } 770 } 771 if (finished) { 772 break; 773 } 774 try { 775 Thread.sleep(10); 776 } catch (InterruptedException e) { 777 } 778 retryNum--; 779 } while (retryNum > 0); 780 781 if (retryNum == 0) { 782 fail("Timeout for validate data"); 783 } else { 784 if (results != null) { 785 for (Result r : results) { 786 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 787 Assert.assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER))); 788 } 789 LOG.info("Validating data on " + table + " successfully!"); 790 } 791 } 792 } 793 794 private void validateEmpty(Object r1) { 795 Result result = (Result) r1; 796 Assert.assertTrue(result != null); 797 Assert.assertTrue(result.isEmpty()); 798 } 799 800 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 801 // Validate got back the same number of Result objects, all empty 802 Assert.assertEquals(expectedSize, results.length); 803 for (Object result : results) { 804 validateEmpty(result); 805 } 806 } 807 808 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { 809 private static final AtomicInteger postBalanceCount = new AtomicInteger(0); 810 private static final AtomicBoolean start = new AtomicBoolean(false); 811 812 @Override 813 public void start(CoprocessorEnvironment env) throws IOException { 814 start.set(true); 815 } 816 817 @Override 818 public Optional<MasterObserver> getMasterObserver() { 819 return Optional.of(this); 820 } 821 822 @Override 823 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx, 824 BalanceRequest request, List<RegionPlan> plans) throws IOException { 825 if (!plans.isEmpty()) { 826 postBalanceCount.incrementAndGet(); 827 } 828 } 829 } 830}