001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.CoprocessorEnvironment; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.codec.KeyValueCodec; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.MasterObserver; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.master.RegionPlan; 051import org.apache.hadoop.hbase.testclassification.FlakeyTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.AfterClass; 057import org.junit.Assert; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@Category({MediumTests.class, FlakeyTests.class}) 067public class TestMultiParallel { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMultiParallel.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); 074 075 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 076 private static final byte[] VALUE = Bytes.toBytes("value"); 077 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 078 private static final String FAMILY = "family"; 079 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 080 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 081 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 082 private static final byte [][] KEYS = makeKeys(); 083 084 private static final int slaves = 5; // also used for testing HTable pool size 085 private static Connection CONNECTION; 086 087 @BeforeClass 088 public static void beforeClass() throws Exception { 089 // Uncomment the following lines if more verbosity is needed for 090 // debugging (see HBASE-12285 for details). 091 //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 092 //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 093 //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 094 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 095 KeyValueCodec.class.getCanonicalName()); 096 // Disable table on master for now as the feature is broken 097 //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 098 // We used to ask for system tables on Master exclusively but not needed by test and doesn't 099 // work anyways -- so commented out. 100 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 101 UTIL.getConfiguration() 102 .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName()); 103 UTIL.startMiniCluster(slaves); 104 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 105 UTIL.waitTableEnabled(TEST_TABLE); 106 t.close(); 107 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 108 assertTrue(MyMasterObserver.start.get()); 109 } 110 111 @AfterClass 112 public static void afterClass() throws Exception { 113 CONNECTION.close(); 114 UTIL.shutdownMiniCluster(); 115 } 116 117 @Before 118 public void before() throws Exception { 119 final int balanceCount = MyMasterObserver.postBalanceCount.get(); 120 LOG.info("before"); 121 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 122 // Distribute regions 123 UTIL.getMiniHBaseCluster().getMaster().balance(); 124 // Some plans are created. 125 if (MyMasterObserver.postBalanceCount.get() > balanceCount) { 126 // It is necessary to wait the move procedure to start. 127 // Otherwise, the next wait may pass immediately. 128 UTIL.waitFor(3 * 1000, 100, false, () -> 129 UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition() 130 ); 131 } 132 133 // Wait until completing balance 134 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); 135 } 136 LOG.info("before done"); 137 } 138 139 private static byte[][] makeKeys() { 140 byte [][] starterKeys = HBaseTestingUtility.KEYS; 141 // Create a "non-uniform" test set with the following characteristics: 142 // a) Unequal number of keys per region 143 144 // Don't use integer as a multiple, so that we have a number of keys that is 145 // not a multiple of the number of regions 146 int numKeys = (int) (starterKeys.length * 10.33F); 147 148 List<byte[]> keys = new ArrayList<>(); 149 for (int i = 0; i < numKeys; i++) { 150 int kIdx = i % starterKeys.length; 151 byte[] k = starterKeys[kIdx]; 152 byte[] cp = new byte[k.length + 1]; 153 System.arraycopy(k, 0, cp, 0, k.length); 154 cp[k.length] = new Integer(i % 256).byteValue(); 155 keys.add(cp); 156 } 157 158 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 159 // should work) 160 // c) keys are not in sorted order (within a region), to ensure that the 161 // sorting code and index mapping doesn't break the functionality 162 for (int i = 0; i < 100; i++) { 163 int kIdx = i % starterKeys.length; 164 byte[] k = starterKeys[kIdx]; 165 byte[] cp = new byte[k.length + 1]; 166 System.arraycopy(k, 0, cp, 0, k.length); 167 cp[k.length] = new Integer(i % 256).byteValue(); 168 keys.add(cp); 169 } 170 return keys.toArray(new byte [][] {new byte [] {}}); 171 } 172 173 174 /** 175 * This is for testing the active number of threads that were used while 176 * doing a batch operation. It inserts one row per region via the batch 177 * operation, and then checks the number of active threads. 178 * <p/> 179 * For HBASE-3553 180 */ 181 @Test 182 public void testActiveThreadsCount() throws Exception { 183 UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1); 184 // Make sure max is at least as big as coresize; can be smaller in test context where 185 // we tune down thread sizes -- max could be < slaves + 1. 186 UTIL.getConfiguration().setLong("hbase.htable.threads.max", slaves + 1); 187 try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { 188 ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); 189 try { 190 try (Table t = connection.getTable(TEST_TABLE, executor)) { 191 List<Put> puts = constructPutRequests(); // creates a Put for every region 192 t.batch(puts, null); 193 HashSet<ServerName> regionservers = new HashSet<>(); 194 try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { 195 for (Row r : puts) { 196 HRegionLocation location = locator.getRegionLocation(r.getRow()); 197 regionservers.add(location.getServerName()); 198 } 199 } 200 assertEquals(regionservers.size(), executor.getLargestPoolSize()); 201 } 202 } finally { 203 executor.shutdownNow(); 204 } 205 } 206 } 207 208 @Test 209 public void testBatchWithGet() throws Exception { 210 LOG.info("test=testBatchWithGet"); 211 Table table = UTIL.getConnection().getTable(TEST_TABLE); 212 213 // load test data 214 List<Put> puts = constructPutRequests(); 215 table.batch(puts, null); 216 217 // create a list of gets and run it 218 List<Row> gets = new ArrayList<>(); 219 for (byte[] k : KEYS) { 220 Get get = new Get(k); 221 get.addColumn(BYTES_FAMILY, QUALIFIER); 222 gets.add(get); 223 } 224 Result[] multiRes = new Result[gets.size()]; 225 table.batch(gets, multiRes); 226 227 // Same gets using individual call API 228 List<Result> singleRes = new ArrayList<>(); 229 for (Row get : gets) { 230 singleRes.add(table.get((Get) get)); 231 } 232 // Compare results 233 Assert.assertEquals(singleRes.size(), multiRes.length); 234 for (int i = 0; i < singleRes.size(); i++) { 235 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 236 Cell[] singleKvs = singleRes.get(i).rawCells(); 237 Cell[] multiKvs = multiRes[i].rawCells(); 238 for (int j = 0; j < singleKvs.length; j++) { 239 Assert.assertEquals(singleKvs[j], multiKvs[j]); 240 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), 241 CellUtil.cloneValue(multiKvs[j]))); 242 } 243 } 244 table.close(); 245 } 246 247 @Test 248 public void testBadFam() throws Exception { 249 LOG.info("test=testBadFam"); 250 Table table = UTIL.getConnection().getTable(TEST_TABLE); 251 252 List<Row> actions = new ArrayList<>(); 253 Put p = new Put(Bytes.toBytes("row1")); 254 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 255 actions.add(p); 256 p = new Put(Bytes.toBytes("row2")); 257 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 258 actions.add(p); 259 260 // row1 and row2 should be in the same region. 261 262 Object [] r = new Object[actions.size()]; 263 try { 264 table.batch(actions, r); 265 fail(); 266 } catch (RetriesExhaustedWithDetailsException ex) { 267 LOG.debug(ex.toString(), ex); 268 // good! 269 assertFalse(ex.mayHaveClusterIssues()); 270 } 271 assertEquals(2, r.length); 272 assertTrue(r[0] instanceof Throwable); 273 assertTrue(r[1] instanceof Result); 274 table.close(); 275 } 276 277 @Test 278 public void testFlushCommitsNoAbort() throws Exception { 279 LOG.info("test=testFlushCommitsNoAbort"); 280 doTestFlushCommits(false); 281 } 282 283 /** 284 * Only run one Multi test with a forced RegionServer abort. Otherwise, the 285 * unit tests will take an unnecessarily long time to run. 286 */ 287 @Test 288 public void testFlushCommitsWithAbort() throws Exception { 289 LOG.info("test=testFlushCommitsWithAbort"); 290 doTestFlushCommits(true); 291 } 292 293 /** 294 * Set table auto flush to false and test flushing commits 295 * @param doAbort true if abort one regionserver in the testing 296 */ 297 private void doTestFlushCommits(boolean doAbort) throws Exception { 298 // Load the data 299 LOG.info("get new table"); 300 Table table = UTIL.getConnection().getTable(TEST_TABLE); 301 302 LOG.info("constructPutRequests"); 303 List<Put> puts = constructPutRequests(); 304 table.put(puts); 305 LOG.info("puts"); 306 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() 307 .size(); 308 assert liveRScount > 0; 309 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster() 310 .getLiveRegionServerThreads().get(0); 311 if (doAbort) { 312 liveRS.getRegionServer().abort("Aborting for tests", 313 new Exception("doTestFlushCommits")); 314 // If we wait for no regions being online after we abort the server, we 315 // could ensure the master has re-assigned the regions on killed server 316 // after writing successfully. It means the server we aborted is dead 317 // and detected by matser 318 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 319 Thread.sleep(100); 320 } 321 // try putting more keys after the abort. same key/qual... just validating 322 // no exceptions thrown 323 puts = constructPutRequests(); 324 table.put(puts); 325 } 326 327 LOG.info("validating loaded data"); 328 validateLoadedData(table); 329 330 // Validate server and region count 331 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 332 int count = 0; 333 for (JVMClusterUtil.RegionServerThread t: liveRSs) { 334 count++; 335 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 336 } 337 LOG.info("Count=" + count); 338 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, 339 (doAbort ? (liveRScount - 1) : liveRScount), count); 340 if (doAbort) { 341 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 342 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 343 @Override 344 public boolean evaluate() throws Exception { 345 // We disable regions on master so the count should be liveRScount - 1 346 return UTIL.getMiniHBaseCluster().getMaster() 347 .getClusterMetrics().getLiveServerMetrics().size() == liveRScount - 1; 348 } 349 }); 350 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 351 } 352 353 table.close(); 354 LOG.info("done"); 355 } 356 357 @Test 358 public void testBatchWithPut() throws Exception { 359 LOG.info("test=testBatchWithPut"); 360 Table table = CONNECTION.getTable(TEST_TABLE); 361 // put multiple rows using a batch 362 List<Put> puts = constructPutRequests(); 363 364 Object[] results = new Object[puts.size()]; 365 table.batch(puts, results); 366 validateSizeAndEmpty(results, KEYS.length); 367 368 if (true) { 369 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 370 assert liveRScount > 0; 371 JVMClusterUtil.RegionServerThread liveRS = 372 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 373 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 374 puts = constructPutRequests(); 375 try { 376 results = new Object[puts.size()]; 377 table.batch(puts, results); 378 } catch (RetriesExhaustedWithDetailsException ree) { 379 LOG.info(ree.getExhaustiveDescription()); 380 table.close(); 381 throw ree; 382 } 383 validateSizeAndEmpty(results, KEYS.length); 384 } 385 386 validateLoadedData(table); 387 table.close(); 388 } 389 390 @Test 391 public void testBatchWithDelete() throws Exception { 392 LOG.info("test=testBatchWithDelete"); 393 Table table = UTIL.getConnection().getTable(TEST_TABLE); 394 395 // Load some data 396 List<Put> puts = constructPutRequests(); 397 Object[] results = new Object[puts.size()]; 398 table.batch(puts, results); 399 validateSizeAndEmpty(results, KEYS.length); 400 401 // Deletes 402 List<Row> deletes = new ArrayList<>(); 403 for (int i = 0; i < KEYS.length; i++) { 404 Delete delete = new Delete(KEYS[i]); 405 delete.addFamily(BYTES_FAMILY); 406 deletes.add(delete); 407 } 408 results= new Object[deletes.size()]; 409 table.batch(deletes, results); 410 validateSizeAndEmpty(results, KEYS.length); 411 412 // Get to make sure ... 413 for (byte[] k : KEYS) { 414 Get get = new Get(k); 415 get.addColumn(BYTES_FAMILY, QUALIFIER); 416 Assert.assertFalse(table.exists(get)); 417 } 418 table.close(); 419 } 420 421 @Test 422 public void testHTableDeleteWithList() throws Exception { 423 LOG.info("test=testHTableDeleteWithList"); 424 Table table = UTIL.getConnection().getTable(TEST_TABLE); 425 426 // Load some data 427 List<Put> puts = constructPutRequests(); 428 Object[] results = new Object[puts.size()]; 429 table.batch(puts, results); 430 validateSizeAndEmpty(results, KEYS.length); 431 432 // Deletes 433 ArrayList<Delete> deletes = new ArrayList<>(); 434 for (int i = 0; i < KEYS.length; i++) { 435 Delete delete = new Delete(KEYS[i]); 436 delete.addFamily(BYTES_FAMILY); 437 deletes.add(delete); 438 } 439 table.delete(deletes); 440 Assert.assertTrue(deletes.isEmpty()); 441 442 // Get to make sure ... 443 for (byte[] k : KEYS) { 444 Get get = new Get(k); 445 get.addColumn(BYTES_FAMILY, QUALIFIER); 446 Assert.assertFalse(table.exists(get)); 447 } 448 table.close(); 449 } 450 451 @Test 452 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 453 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 454 Table table = UTIL.getConnection().getTable(TEST_TABLE); 455 456 List<Row> puts = new ArrayList<>(); 457 for (int i = 0; i < 100; i++) { 458 Put put = new Put(ONE_ROW); 459 byte[] qual = Bytes.toBytes("column" + i); 460 put.addColumn(BYTES_FAMILY, qual, VALUE); 461 puts.add(put); 462 } 463 Object[] results = new Object[puts.size()]; 464 table.batch(puts, results); 465 466 // validate 467 validateSizeAndEmpty(results, 100); 468 469 // get the data back and validate that it is correct 470 List<Row> gets = new ArrayList<>(); 471 for (int i = 0; i < 100; i++) { 472 Get get = new Get(ONE_ROW); 473 byte[] qual = Bytes.toBytes("column" + i); 474 get.addColumn(BYTES_FAMILY, qual); 475 gets.add(get); 476 } 477 478 Object[] multiRes = new Object[gets.size()]; 479 table.batch(gets, multiRes); 480 481 int idx = 0; 482 for (Object r : multiRes) { 483 byte[] qual = Bytes.toBytes("column" + idx); 484 validateResult(r, qual, VALUE); 485 idx++; 486 } 487 table.close(); 488 } 489 490 @Test 491 public void testBatchWithIncrementAndAppend() throws Exception { 492 LOG.info("test=testBatchWithIncrementAndAppend"); 493 final byte[] QUAL1 = Bytes.toBytes("qual1"); 494 final byte[] QUAL2 = Bytes.toBytes("qual2"); 495 final byte[] QUAL3 = Bytes.toBytes("qual3"); 496 final byte[] QUAL4 = Bytes.toBytes("qual4"); 497 Table table = UTIL.getConnection().getTable(TEST_TABLE); 498 Delete d = new Delete(ONE_ROW); 499 table.delete(d); 500 Put put = new Put(ONE_ROW); 501 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 502 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 503 table.put(put); 504 505 Increment inc = new Increment(ONE_ROW); 506 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 507 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 508 509 Append a = new Append(ONE_ROW); 510 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 511 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 512 List<Row> actions = new ArrayList<>(); 513 actions.add(inc); 514 actions.add(a); 515 516 Object[] multiRes = new Object[actions.size()]; 517 table.batch(actions, multiRes); 518 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 519 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 520 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 521 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 522 table.close(); 523 } 524 525 @Test 526 public void testNonceCollision() throws Exception { 527 LOG.info("test=testNonceCollision"); 528 final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); 529 Table table = connection.getTable(TEST_TABLE); 530 Put put = new Put(ONE_ROW); 531 put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); 532 533 // Replace nonce manager with the one that returns each nonce twice. 534 NonceGenerator cnm = new NonceGenerator() { 535 536 private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get(); 537 538 private long lastNonce = -1; 539 540 @Override 541 public synchronized long newNonce() { 542 long nonce = 0; 543 if (lastNonce == -1) { 544 lastNonce = nonce = delegate.newNonce(); 545 } else { 546 nonce = lastNonce; 547 lastNonce = -1L; 548 } 549 return nonce; 550 } 551 552 @Override 553 public long getNonceGroup() { 554 return delegate.getNonceGroup(); 555 } 556 }; 557 558 NonceGenerator oldCnm = 559 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm); 560 561 // First test sequential requests. 562 try { 563 Increment inc = new Increment(ONE_ROW); 564 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 565 table.increment(inc); 566 567 // duplicate increment 568 inc = new Increment(ONE_ROW); 569 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 570 Result result = table.increment(inc); 571 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 572 573 Get get = new Get(ONE_ROW); 574 get.addColumn(BYTES_FAMILY, QUALIFIER); 575 result = table.get(get); 576 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 577 578 // Now run a bunch of requests in parallel, exactly half should succeed. 579 int numRequests = 40; 580 final CountDownLatch startedLatch = new CountDownLatch(numRequests); 581 final CountDownLatch startLatch = new CountDownLatch(1); 582 final CountDownLatch doneLatch = new CountDownLatch(numRequests); 583 for (int i = 0; i < numRequests; ++i) { 584 Runnable r = new Runnable() { 585 @Override 586 public void run() { 587 Table table = null; 588 try { 589 table = connection.getTable(TEST_TABLE); 590 } catch (IOException e) { 591 fail("Not expected"); 592 } 593 Increment inc = new Increment(ONE_ROW); 594 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 595 startedLatch.countDown(); 596 try { 597 startLatch.await(); 598 } catch (InterruptedException e) { 599 fail("Not expected"); 600 } 601 try { 602 table.increment(inc); 603 } catch (IOException ioEx) { 604 fail("Not expected"); 605 } 606 doneLatch.countDown(); 607 } 608 }; 609 Threads.setDaemonThreadRunning(new Thread(r)); 610 } 611 startedLatch.await(); // Wait until all threads are ready... 612 startLatch.countDown(); // ...and unleash the herd! 613 doneLatch.await(); 614 // Now verify 615 get = new Get(ONE_ROW); 616 get.addColumn(BYTES_FAMILY, QUALIFIER); 617 result = table.get(get); 618 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); 619 table.close(); 620 } finally { 621 ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm); 622 } 623 } 624 625 @Test 626 public void testBatchWithMixedActions() throws Exception { 627 LOG.info("test=testBatchWithMixedActions"); 628 Table table = UTIL.getConnection().getTable(TEST_TABLE); 629 630 // Load some data to start 631 List<Put> puts = constructPutRequests(); 632 Object[] results = new Object[puts.size()]; 633 table.batch(puts, results); 634 validateSizeAndEmpty(results, KEYS.length); 635 636 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 637 // put 638 List<Row> actions = new ArrayList<>(); 639 640 byte[] qual2 = Bytes.toBytes("qual2"); 641 byte[] val2 = Bytes.toBytes("putvalue2"); 642 643 // 0 get 644 Get get = new Get(KEYS[10]); 645 get.addColumn(BYTES_FAMILY, QUALIFIER); 646 actions.add(get); 647 648 // 1 get 649 get = new Get(KEYS[11]); 650 get.addColumn(BYTES_FAMILY, QUALIFIER); 651 actions.add(get); 652 653 // 2 put of new column 654 Put put = new Put(KEYS[10]); 655 put.addColumn(BYTES_FAMILY, qual2, val2); 656 actions.add(put); 657 658 // 3 delete 659 Delete delete = new Delete(KEYS[20]); 660 delete.addFamily(BYTES_FAMILY); 661 actions.add(delete); 662 663 // 4 get 664 get = new Get(KEYS[30]); 665 get.addColumn(BYTES_FAMILY, QUALIFIER); 666 actions.add(get); 667 668 // There used to be a 'get' of a previous put here, but removed 669 // since this API really cannot guarantee order in terms of mixed 670 // get/puts. 671 672 // 5 put of new column 673 put = new Put(KEYS[40]); 674 put.addColumn(BYTES_FAMILY, qual2, val2); 675 actions.add(put); 676 677 // 6 RowMutations 678 RowMutations rm = new RowMutations(KEYS[50]); 679 put = new Put(KEYS[50]); 680 put.addColumn(BYTES_FAMILY, qual2, val2); 681 rm.add((Mutation) put); 682 byte[] qual3 = Bytes.toBytes("qual3"); 683 byte[] val3 = Bytes.toBytes("putvalue3"); 684 put = new Put(KEYS[50]); 685 put.addColumn(BYTES_FAMILY, qual3, val3); 686 rm.add((Mutation) put); 687 actions.add(rm); 688 689 // 7 Add another Get to the mixed sequence after RowMutations 690 get = new Get(KEYS[10]); 691 get.addColumn(BYTES_FAMILY, QUALIFIER); 692 actions.add(get); 693 694 results = new Object[actions.size()]; 695 table.batch(actions, results); 696 697 // Validation 698 699 validateResult(results[0]); 700 validateResult(results[1]); 701 validateEmpty(results[3]); 702 validateResult(results[4]); 703 validateEmpty(results[5]); 704 validateEmpty(results[6]); 705 validateResult(results[7]); 706 707 // validate last put, externally from the batch 708 get = new Get(KEYS[40]); 709 get.addColumn(BYTES_FAMILY, qual2); 710 Result r = table.get(get); 711 validateResult(r, qual2, val2); 712 713 // validate last RowMutations, externally from the batch 714 get = new Get(KEYS[50]); 715 get.addColumn(BYTES_FAMILY, qual2); 716 r = table.get(get); 717 validateResult(r, qual2, val2); 718 719 get = new Get(KEYS[50]); 720 get.addColumn(BYTES_FAMILY, qual3); 721 r = table.get(get); 722 validateResult(r, qual3, val3); 723 724 table.close(); 725 } 726 727 // // Helper methods //// 728 729 private void validateResult(Object r) { 730 validateResult(r, QUALIFIER, VALUE); 731 } 732 733 private void validateResult(Object r1, byte[] qual, byte[] val) { 734 Result r = (Result)r1; 735 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 736 byte[] value = r.getValue(BYTES_FAMILY, qual); 737 if (0 != Bytes.compareTo(val, value)) { 738 fail("Expected [" + Bytes.toStringBinary(val) 739 + "] but got [" + Bytes.toStringBinary(value) + "]"); 740 } 741 } 742 743 private List<Put> constructPutRequests() { 744 List<Put> puts = new ArrayList<>(); 745 for (byte[] k : KEYS) { 746 Put put = new Put(k); 747 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE); 748 puts.add(put); 749 } 750 return puts; 751 } 752 753 private void validateLoadedData(Table table) throws IOException { 754 // get the data back and validate that it is correct 755 LOG.info("Validating data on " + table); 756 List<Get> gets = new ArrayList<>(); 757 for (byte[] k : KEYS) { 758 Get get = new Get(k); 759 get.addColumn(BYTES_FAMILY, QUALIFIER); 760 gets.add(get); 761 } 762 int retryNum = 10; 763 Result[] results = null; 764 do { 765 results = table.get(gets); 766 boolean finished = true; 767 for (Result result : results) { 768 if (result.isEmpty()) { 769 finished = false; 770 break; 771 } 772 } 773 if (finished) { 774 break; 775 } 776 try { 777 Thread.sleep(10); 778 } catch (InterruptedException e) { 779 } 780 retryNum--; 781 } while (retryNum > 0); 782 783 if (retryNum == 0) { 784 fail("Timeout for validate data"); 785 } else { 786 if (results != null) { 787 for (Result r : results) { 788 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 789 Assert.assertEquals(0, Bytes.compareTo(VALUE, r 790 .getValue(BYTES_FAMILY, QUALIFIER))); 791 } 792 LOG.info("Validating data on " + table + " successfully!"); 793 } 794 } 795 } 796 797 private void validateEmpty(Object r1) { 798 Result result = (Result)r1; 799 Assert.assertTrue(result != null); 800 Assert.assertTrue(result.isEmpty()); 801 } 802 803 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 804 // Validate got back the same number of Result objects, all empty 805 Assert.assertEquals(expectedSize, results.length); 806 for (Object result : results) { 807 validateEmpty(result); 808 } 809 } 810 811 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { 812 private static final AtomicInteger postBalanceCount = new AtomicInteger(0); 813 private static final AtomicBoolean start = new AtomicBoolean(false); 814 815 @Override 816 public void start(CoprocessorEnvironment env) throws IOException { 817 start.set(true); 818 } 819 820 @Override 821 public Optional<MasterObserver> getMasterObserver() { 822 return Optional.of(this); 823 } 824 825 @Override 826 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx, 827 List<RegionPlan> plans) throws IOException { 828 if (!plans.isEmpty()) { 829 postBalanceCount.incrementAndGet(); 830 } 831 } 832 } 833}