001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CoprocessorEnvironment; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.codec.KeyValueCodec; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.MasterObserver; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.master.RegionPlan; 051import org.apache.hadoop.hbase.testclassification.FlakeyTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.AfterClass; 057import org.junit.Assert; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@Category({MediumTests.class, FlakeyTests.class}) 067public class TestMultiParallel { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMultiParallel.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); 074 075 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 076 private static final byte[] VALUE = Bytes.toBytes("value"); 077 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 078 private static final String FAMILY = "family"; 079 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 080 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 081 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 082 private static final byte [][] KEYS = makeKeys(); 083 084 private static final int slaves = 5; // also used for testing HTable pool size 085 private static Connection CONNECTION; 086 087 @BeforeClass 088 public static void beforeClass() throws Exception { 089 // Uncomment the following lines if more verbosity is needed for 090 // debugging (see HBASE-12285 for details). 091 //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 092 //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 093 //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 094 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 095 KeyValueCodec.class.getCanonicalName()); 096 // Disable table on master for now as the feature is broken 097 //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 098 // We used to ask for system tables on Master exclusively but not needed by test and doesn't 099 // work anyways -- so commented out. 100 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 101 UTIL.getConfiguration() 102 .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName()); 103 UTIL.startMiniCluster(slaves); 104 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 105 UTIL.waitTableEnabled(TEST_TABLE); 106 t.close(); 107 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 108 assertTrue(MyMasterObserver.start.get()); 109 } 110 111 @AfterClass 112 public static void afterClass() throws Exception { 113 CONNECTION.close(); 114 UTIL.shutdownMiniCluster(); 115 } 116 117 @Before 118 public void before() throws Exception { 119 final int balanceCount = MyMasterObserver.postBalanceCount.get(); 120 LOG.info("before"); 121 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 122 // Distribute regions 123 UTIL.getMiniHBaseCluster().getMaster().balance(); 124 // Some plans are created. 125 if (MyMasterObserver.postBalanceCount.get() > balanceCount) { 126 // It is necessary to wait the move procedure to start. 127 // Otherwise, the next wait may pass immediately. 128 UTIL.waitFor(3 * 1000, 100, false, () -> 129 UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition() 130 ); 131 } 132 133 // Wait until completing balance 134 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); 135 } 136 LOG.info("before done"); 137 } 138 139 private static byte[][] makeKeys() { 140 byte [][] starterKeys = HBaseTestingUtility.KEYS; 141 // Create a "non-uniform" test set with the following characteristics: 142 // a) Unequal number of keys per region 143 144 // Don't use integer as a multiple, so that we have a number of keys that is 145 // not a multiple of the number of regions 146 int numKeys = (int) (starterKeys.length * 10.33F); 147 148 List<byte[]> keys = new ArrayList<>(); 149 for (int i = 0; i < numKeys; i++) { 150 int kIdx = i % starterKeys.length; 151 byte[] k = starterKeys[kIdx]; 152 byte[] cp = new byte[k.length + 1]; 153 System.arraycopy(k, 0, cp, 0, k.length); 154 cp[k.length] = new Integer(i % 256).byteValue(); 155 keys.add(cp); 156 } 157 158 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 159 // should work) 160 // c) keys are not in sorted order (within a region), to ensure that the 161 // sorting code and index mapping doesn't break the functionality 162 for (int i = 0; i < 100; i++) { 163 int kIdx = i % starterKeys.length; 164 byte[] k = starterKeys[kIdx]; 165 byte[] cp = new byte[k.length + 1]; 166 System.arraycopy(k, 0, cp, 0, k.length); 167 cp[k.length] = new Integer(i % 256).byteValue(); 168 keys.add(cp); 169 } 170 return keys.toArray(new byte [][] {new byte [] {}}); 171 } 172 173 174 /** 175 * This is for testing the active number of threads that were used while 176 * doing a batch operation. It inserts one row per region via the batch 177 * operation, and then checks the number of active threads. 178 * <p/> 179 * For HBASE-3553 180 */ 181 @Test 182 public void testActiveThreadsCount() throws Exception { 183 UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1); 184 try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { 185 ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); 186 try { 187 try (Table t = connection.getTable(TEST_TABLE, executor)) { 188 List<Put> puts = constructPutRequests(); // creates a Put for every region 189 t.batch(puts, null); 190 HashSet<ServerName> regionservers = new HashSet<>(); 191 try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { 192 for (Row r : puts) { 193 HRegionLocation location = locator.getRegionLocation(r.getRow()); 194 regionservers.add(location.getServerName()); 195 } 196 } 197 assertEquals(regionservers.size(), executor.getLargestPoolSize()); 198 } 199 } finally { 200 executor.shutdownNow(); 201 } 202 } 203 } 204 205 @Test 206 public void testBatchWithGet() throws Exception { 207 LOG.info("test=testBatchWithGet"); 208 Table table = UTIL.getConnection().getTable(TEST_TABLE); 209 210 // load test data 211 List<Put> puts = constructPutRequests(); 212 table.batch(puts, null); 213 214 // create a list of gets and run it 215 List<Row> gets = new ArrayList<>(); 216 for (byte[] k : KEYS) { 217 Get get = new Get(k); 218 get.addColumn(BYTES_FAMILY, QUALIFIER); 219 gets.add(get); 220 } 221 Result[] multiRes = new Result[gets.size()]; 222 table.batch(gets, multiRes); 223 224 // Same gets using individual call API 225 List<Result> singleRes = new ArrayList<>(); 226 for (Row get : gets) { 227 singleRes.add(table.get((Get) get)); 228 } 229 // Compare results 230 Assert.assertEquals(singleRes.size(), multiRes.length); 231 for (int i = 0; i < singleRes.size(); i++) { 232 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 233 Cell[] singleKvs = singleRes.get(i).rawCells(); 234 Cell[] multiKvs = multiRes[i].rawCells(); 235 for (int j = 0; j < singleKvs.length; j++) { 236 Assert.assertEquals(singleKvs[j], multiKvs[j]); 237 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), 238 CellUtil.cloneValue(multiKvs[j]))); 239 } 240 } 241 table.close(); 242 } 243 244 @Test 245 public void testBadFam() throws Exception { 246 LOG.info("test=testBadFam"); 247 Table table = UTIL.getConnection().getTable(TEST_TABLE); 248 249 List<Row> actions = new ArrayList<>(); 250 Put p = new Put(Bytes.toBytes("row1")); 251 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 252 actions.add(p); 253 p = new Put(Bytes.toBytes("row2")); 254 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 255 actions.add(p); 256 257 // row1 and row2 should be in the same region. 258 259 Object [] r = new Object[actions.size()]; 260 try { 261 table.batch(actions, r); 262 fail(); 263 } catch (RetriesExhaustedWithDetailsException ex) { 264 LOG.debug(ex.toString(), ex); 265 // good! 266 assertFalse(ex.mayHaveClusterIssues()); 267 } 268 assertEquals(2, r.length); 269 assertTrue(r[0] instanceof Throwable); 270 assertTrue(r[1] instanceof Result); 271 table.close(); 272 } 273 274 @Test 275 public void testFlushCommitsNoAbort() throws Exception { 276 LOG.info("test=testFlushCommitsNoAbort"); 277 doTestFlushCommits(false); 278 } 279 280 /** 281 * Only run one Multi test with a forced RegionServer abort. Otherwise, the 282 * unit tests will take an unnecessarily long time to run. 283 */ 284 @Test 285 public void testFlushCommitsWithAbort() throws Exception { 286 LOG.info("test=testFlushCommitsWithAbort"); 287 doTestFlushCommits(true); 288 } 289 290 /** 291 * Set table auto flush to false and test flushing commits 292 * @param doAbort true if abort one regionserver in the testing 293 */ 294 private void doTestFlushCommits(boolean doAbort) throws Exception { 295 // Load the data 296 LOG.info("get new table"); 297 Table table = UTIL.getConnection().getTable(TEST_TABLE); 298 299 LOG.info("constructPutRequests"); 300 List<Put> puts = constructPutRequests(); 301 table.put(puts); 302 LOG.info("puts"); 303 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() 304 .size(); 305 assert liveRScount > 0; 306 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster() 307 .getLiveRegionServerThreads().get(0); 308 if (doAbort) { 309 liveRS.getRegionServer().abort("Aborting for tests", 310 new Exception("doTestFlushCommits")); 311 // If we wait for no regions being online after we abort the server, we 312 // could ensure the master has re-assigned the regions on killed server 313 // after writing successfully. It means the server we aborted is dead 314 // and detected by matser 315 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 316 Thread.sleep(100); 317 } 318 // try putting more keys after the abort. same key/qual... just validating 319 // no exceptions thrown 320 puts = constructPutRequests(); 321 table.put(puts); 322 } 323 324 LOG.info("validating loaded data"); 325 validateLoadedData(table); 326 327 // Validate server and region count 328 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 329 int count = 0; 330 for (JVMClusterUtil.RegionServerThread t: liveRSs) { 331 count++; 332 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 333 } 334 LOG.info("Count=" + count); 335 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, 336 (doAbort ? (liveRScount - 1) : liveRScount), count); 337 if (doAbort) { 338 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 339 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 340 @Override 341 public boolean evaluate() throws Exception { 342 // We disable regions on master so the count should be liveRScount - 1 343 return UTIL.getMiniHBaseCluster().getMaster() 344 .getClusterMetrics().getLiveServerMetrics().size() == liveRScount - 1; 345 } 346 }); 347 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 348 } 349 350 table.close(); 351 LOG.info("done"); 352 } 353 354 @Test 355 public void testBatchWithPut() throws Exception { 356 LOG.info("test=testBatchWithPut"); 357 Table table = CONNECTION.getTable(TEST_TABLE); 358 // put multiple rows using a batch 359 List<Put> puts = constructPutRequests(); 360 361 Object[] results = new Object[puts.size()]; 362 table.batch(puts, results); 363 validateSizeAndEmpty(results, KEYS.length); 364 365 if (true) { 366 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 367 assert liveRScount > 0; 368 JVMClusterUtil.RegionServerThread liveRS = 369 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 370 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 371 puts = constructPutRequests(); 372 try { 373 results = new Object[puts.size()]; 374 table.batch(puts, results); 375 } catch (RetriesExhaustedWithDetailsException ree) { 376 LOG.info(ree.getExhaustiveDescription()); 377 table.close(); 378 throw ree; 379 } 380 validateSizeAndEmpty(results, KEYS.length); 381 } 382 383 validateLoadedData(table); 384 table.close(); 385 } 386 387 @Test 388 public void testBatchWithDelete() throws Exception { 389 LOG.info("test=testBatchWithDelete"); 390 Table table = UTIL.getConnection().getTable(TEST_TABLE); 391 392 // Load some data 393 List<Put> puts = constructPutRequests(); 394 Object[] results = new Object[puts.size()]; 395 table.batch(puts, results); 396 validateSizeAndEmpty(results, KEYS.length); 397 398 // Deletes 399 List<Row> deletes = new ArrayList<>(); 400 for (int i = 0; i < KEYS.length; i++) { 401 Delete delete = new Delete(KEYS[i]); 402 delete.addFamily(BYTES_FAMILY); 403 deletes.add(delete); 404 } 405 results= new Object[deletes.size()]; 406 table.batch(deletes, results); 407 validateSizeAndEmpty(results, KEYS.length); 408 409 // Get to make sure ... 410 for (byte[] k : KEYS) { 411 Get get = new Get(k); 412 get.addColumn(BYTES_FAMILY, QUALIFIER); 413 Assert.assertFalse(table.exists(get)); 414 } 415 table.close(); 416 } 417 418 @Test 419 public void testHTableDeleteWithList() throws Exception { 420 LOG.info("test=testHTableDeleteWithList"); 421 Table table = UTIL.getConnection().getTable(TEST_TABLE); 422 423 // Load some data 424 List<Put> puts = constructPutRequests(); 425 Object[] results = new Object[puts.size()]; 426 table.batch(puts, results); 427 validateSizeAndEmpty(results, KEYS.length); 428 429 // Deletes 430 ArrayList<Delete> deletes = new ArrayList<>(); 431 for (int i = 0; i < KEYS.length; i++) { 432 Delete delete = new Delete(KEYS[i]); 433 delete.addFamily(BYTES_FAMILY); 434 deletes.add(delete); 435 } 436 table.delete(deletes); 437 Assert.assertTrue(deletes.isEmpty()); 438 439 // Get to make sure ... 440 for (byte[] k : KEYS) { 441 Get get = new Get(k); 442 get.addColumn(BYTES_FAMILY, QUALIFIER); 443 Assert.assertFalse(table.exists(get)); 444 } 445 table.close(); 446 } 447 448 @Test 449 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 450 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 451 Table table = UTIL.getConnection().getTable(TEST_TABLE); 452 453 List<Row> puts = new ArrayList<>(); 454 for (int i = 0; i < 100; i++) { 455 Put put = new Put(ONE_ROW); 456 byte[] qual = Bytes.toBytes("column" + i); 457 put.addColumn(BYTES_FAMILY, qual, VALUE); 458 puts.add(put); 459 } 460 Object[] results = new Object[puts.size()]; 461 table.batch(puts, results); 462 463 // validate 464 validateSizeAndEmpty(results, 100); 465 466 // get the data back and validate that it is correct 467 List<Row> gets = new ArrayList<>(); 468 for (int i = 0; i < 100; i++) { 469 Get get = new Get(ONE_ROW); 470 byte[] qual = Bytes.toBytes("column" + i); 471 get.addColumn(BYTES_FAMILY, qual); 472 gets.add(get); 473 } 474 475 Object[] multiRes = new Object[gets.size()]; 476 table.batch(gets, multiRes); 477 478 int idx = 0; 479 for (Object r : multiRes) { 480 byte[] qual = Bytes.toBytes("column" + idx); 481 validateResult(r, qual, VALUE); 482 idx++; 483 } 484 table.close(); 485 } 486 487 @Test 488 public void testBatchWithIncrementAndAppend() throws Exception { 489 LOG.info("test=testBatchWithIncrementAndAppend"); 490 final byte[] QUAL1 = Bytes.toBytes("qual1"); 491 final byte[] QUAL2 = Bytes.toBytes("qual2"); 492 final byte[] QUAL3 = Bytes.toBytes("qual3"); 493 final byte[] QUAL4 = Bytes.toBytes("qual4"); 494 Table table = UTIL.getConnection().getTable(TEST_TABLE); 495 Delete d = new Delete(ONE_ROW); 496 table.delete(d); 497 Put put = new Put(ONE_ROW); 498 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 499 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 500 table.put(put); 501 502 Increment inc = new Increment(ONE_ROW); 503 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 504 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 505 506 Append a = new Append(ONE_ROW); 507 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 508 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 509 List<Row> actions = new ArrayList<>(); 510 actions.add(inc); 511 actions.add(a); 512 513 Object[] multiRes = new Object[actions.size()]; 514 table.batch(actions, multiRes); 515 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 516 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 517 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 518 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 519 table.close(); 520 } 521 522 @Test 523 public void testNonceCollision() throws Exception { 524 LOG.info("test=testNonceCollision"); 525 final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); 526 Table table = connection.getTable(TEST_TABLE); 527 Put put = new Put(ONE_ROW); 528 put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); 529 530 // Replace nonce manager with the one that returns each nonce twice. 531 NonceGenerator cnm = new NonceGenerator() { 532 533 private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get(); 534 535 private long lastNonce = -1; 536 537 @Override 538 public synchronized long newNonce() { 539 long nonce = 0; 540 if (lastNonce == -1) { 541 lastNonce = nonce = delegate.newNonce(); 542 } else { 543 nonce = lastNonce; 544 lastNonce = -1L; 545 } 546 return nonce; 547 } 548 549 @Override 550 public long getNonceGroup() { 551 return delegate.getNonceGroup(); 552 } 553 }; 554 555 NonceGenerator oldCnm = 556 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm); 557 558 // First test sequential requests. 559 try { 560 Increment inc = new Increment(ONE_ROW); 561 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 562 table.increment(inc); 563 564 // duplicate increment 565 inc = new Increment(ONE_ROW); 566 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 567 Result result = table.increment(inc); 568 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 569 570 Get get = new Get(ONE_ROW); 571 get.addColumn(BYTES_FAMILY, QUALIFIER); 572 result = table.get(get); 573 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 574 575 // Now run a bunch of requests in parallel, exactly half should succeed. 576 int numRequests = 40; 577 final CountDownLatch startedLatch = new CountDownLatch(numRequests); 578 final CountDownLatch startLatch = new CountDownLatch(1); 579 final CountDownLatch doneLatch = new CountDownLatch(numRequests); 580 for (int i = 0; i < numRequests; ++i) { 581 Runnable r = new Runnable() { 582 @Override 583 public void run() { 584 Table table = null; 585 try { 586 table = connection.getTable(TEST_TABLE); 587 } catch (IOException e) { 588 fail("Not expected"); 589 } 590 Increment inc = new Increment(ONE_ROW); 591 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 592 startedLatch.countDown(); 593 try { 594 startLatch.await(); 595 } catch (InterruptedException e) { 596 fail("Not expected"); 597 } 598 try { 599 table.increment(inc); 600 } catch (IOException ioEx) { 601 fail("Not expected"); 602 } 603 doneLatch.countDown(); 604 } 605 }; 606 Threads.setDaemonThreadRunning(new Thread(r)); 607 } 608 startedLatch.await(); // Wait until all threads are ready... 609 startLatch.countDown(); // ...and unleash the herd! 610 doneLatch.await(); 611 // Now verify 612 get = new Get(ONE_ROW); 613 get.addColumn(BYTES_FAMILY, QUALIFIER); 614 result = table.get(get); 615 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); 616 table.close(); 617 } finally { 618 ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm); 619 } 620 } 621 622 @Test 623 public void testBatchWithMixedActions() throws Exception { 624 LOG.info("test=testBatchWithMixedActions"); 625 Table table = UTIL.getConnection().getTable(TEST_TABLE); 626 627 // Load some data to start 628 List<Put> puts = constructPutRequests(); 629 Object[] results = new Object[puts.size()]; 630 table.batch(puts, results); 631 validateSizeAndEmpty(results, KEYS.length); 632 633 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 634 // put 635 List<Row> actions = new ArrayList<>(); 636 637 byte[] qual2 = Bytes.toBytes("qual2"); 638 byte[] val2 = Bytes.toBytes("putvalue2"); 639 640 // 0 get 641 Get get = new Get(KEYS[10]); 642 get.addColumn(BYTES_FAMILY, QUALIFIER); 643 actions.add(get); 644 645 // 1 get 646 get = new Get(KEYS[11]); 647 get.addColumn(BYTES_FAMILY, QUALIFIER); 648 actions.add(get); 649 650 // 2 put of new column 651 Put put = new Put(KEYS[10]); 652 put.addColumn(BYTES_FAMILY, qual2, val2); 653 actions.add(put); 654 655 // 3 delete 656 Delete delete = new Delete(KEYS[20]); 657 delete.addFamily(BYTES_FAMILY); 658 actions.add(delete); 659 660 // 4 get 661 get = new Get(KEYS[30]); 662 get.addColumn(BYTES_FAMILY, QUALIFIER); 663 actions.add(get); 664 665 // There used to be a 'get' of a previous put here, but removed 666 // since this API really cannot guarantee order in terms of mixed 667 // get/puts. 668 669 // 5 put of new column 670 put = new Put(KEYS[40]); 671 put.addColumn(BYTES_FAMILY, qual2, val2); 672 actions.add(put); 673 674 // 6 RowMutations 675 RowMutations rm = new RowMutations(KEYS[50]); 676 put = new Put(KEYS[50]); 677 put.addColumn(BYTES_FAMILY, qual2, val2); 678 rm.add((Mutation) put); 679 byte[] qual3 = Bytes.toBytes("qual3"); 680 byte[] val3 = Bytes.toBytes("putvalue3"); 681 put = new Put(KEYS[50]); 682 put.addColumn(BYTES_FAMILY, qual3, val3); 683 rm.add((Mutation) put); 684 actions.add(rm); 685 686 // 7 Add another Get to the mixed sequence after RowMutations 687 get = new Get(KEYS[10]); 688 get.addColumn(BYTES_FAMILY, QUALIFIER); 689 actions.add(get); 690 691 results = new Object[actions.size()]; 692 table.batch(actions, results); 693 694 // Validation 695 696 validateResult(results[0]); 697 validateResult(results[1]); 698 validateEmpty(results[3]); 699 validateResult(results[4]); 700 validateEmpty(results[5]); 701 validateEmpty(results[6]); 702 validateResult(results[7]); 703 704 // validate last put, externally from the batch 705 get = new Get(KEYS[40]); 706 get.addColumn(BYTES_FAMILY, qual2); 707 Result r = table.get(get); 708 validateResult(r, qual2, val2); 709 710 // validate last RowMutations, externally from the batch 711 get = new Get(KEYS[50]); 712 get.addColumn(BYTES_FAMILY, qual2); 713 r = table.get(get); 714 validateResult(r, qual2, val2); 715 716 get = new Get(KEYS[50]); 717 get.addColumn(BYTES_FAMILY, qual3); 718 r = table.get(get); 719 validateResult(r, qual3, val3); 720 721 table.close(); 722 } 723 724 // // Helper methods //// 725 726 private void validateResult(Object r) { 727 validateResult(r, QUALIFIER, VALUE); 728 } 729 730 private void validateResult(Object r1, byte[] qual, byte[] val) { 731 Result r = (Result)r1; 732 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 733 byte[] value = r.getValue(BYTES_FAMILY, qual); 734 if (0 != Bytes.compareTo(val, value)) { 735 fail("Expected [" + Bytes.toStringBinary(val) 736 + "] but got [" + Bytes.toStringBinary(value) + "]"); 737 } 738 } 739 740 private List<Put> constructPutRequests() { 741 List<Put> puts = new ArrayList<>(); 742 for (byte[] k : KEYS) { 743 Put put = new Put(k); 744 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE); 745 puts.add(put); 746 } 747 return puts; 748 } 749 750 private void validateLoadedData(Table table) throws IOException { 751 // get the data back and validate that it is correct 752 LOG.info("Validating data on " + table); 753 List<Get> gets = new ArrayList<>(); 754 for (byte[] k : KEYS) { 755 Get get = new Get(k); 756 get.addColumn(BYTES_FAMILY, QUALIFIER); 757 gets.add(get); 758 } 759 int retryNum = 10; 760 Result[] results = null; 761 do { 762 results = table.get(gets); 763 boolean finished = true; 764 for (Result result : results) { 765 if (result.isEmpty()) { 766 finished = false; 767 break; 768 } 769 } 770 if (finished) { 771 break; 772 } 773 try { 774 Thread.sleep(10); 775 } catch (InterruptedException e) { 776 } 777 retryNum--; 778 } while (retryNum > 0); 779 780 if (retryNum == 0) { 781 fail("Timeout for validate data"); 782 } else { 783 if (results != null) { 784 for (Result r : results) { 785 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 786 Assert.assertEquals(0, Bytes.compareTo(VALUE, r 787 .getValue(BYTES_FAMILY, QUALIFIER))); 788 } 789 LOG.info("Validating data on " + table + " successfully!"); 790 } 791 } 792 } 793 794 private void validateEmpty(Object r1) { 795 Result result = (Result)r1; 796 Assert.assertTrue(result != null); 797 Assert.assertTrue(result.isEmpty()); 798 } 799 800 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 801 // Validate got back the same number of Result objects, all empty 802 Assert.assertEquals(expectedSize, results.length); 803 for (Object result : results) { 804 validateEmpty(result); 805 } 806 } 807 808 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { 809 private static final AtomicInteger postBalanceCount = new AtomicInteger(0); 810 private static final AtomicBoolean start = new AtomicBoolean(false); 811 812 @Override 813 public void start(CoprocessorEnvironment env) throws IOException { 814 start.set(true); 815 } 816 817 @Override 818 public Optional<MasterObserver> getMasterObserver() { 819 return Optional.of(this); 820 } 821 822 @Override 823 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx, 824 List<RegionPlan> plans) throws IOException { 825 if (!plans.isEmpty()) { 826 postBalanceCount.incrementAndGet(); 827 } 828 } 829 } 830}