001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.CoprocessorEnvironment; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.codec.KeyValueCodec; 039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 040import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 042import org.apache.hadoop.hbase.coprocessor.MasterObserver; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.master.RegionPlan; 045import org.apache.hadoop.hbase.testclassification.FlakeyTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.junit.jupiter.api.AfterAll; 050import org.junit.jupiter.api.BeforeAll; 051import org.junit.jupiter.api.BeforeEach; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Tag(MediumTests.TAG) 058@Tag(FlakeyTests.TAG) 059public class TestMultiParallel { 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); 062 063 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 064 private static final byte[] VALUE = Bytes.toBytes("value"); 065 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 066 private static final String FAMILY = "family"; 067 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 068 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 069 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 070 private static final byte[][] KEYS = makeKeys(); 071 072 private static final int slaves = 5; // also used for testing HTable pool size 073 private static Connection CONNECTION; 074 075 @BeforeAll 076 public static void beforeClass() throws Exception { 077 // Uncomment the following lines if more verbosity is needed for 078 // debugging (see HBASE-12285 for details). 079 // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 080 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 081 // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 082 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 083 KeyValueCodec.class.getCanonicalName()); 084 // Disable table on master for now as the feature is broken 085 // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 086 // We used to ask for system tables on Master exclusively but not needed by test and doesn't 087 // work anyways -- so commented out. 088 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 089 UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 090 MyMasterObserver.class.getName()); 091 UTIL.startMiniCluster(slaves); 092 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 093 UTIL.waitTableEnabled(TEST_TABLE); 094 t.close(); 095 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 096 assertTrue(MyMasterObserver.start.get()); 097 } 098 099 @AfterAll 100 public static void afterClass() throws Exception { 101 CONNECTION.close(); 102 UTIL.shutdownMiniCluster(); 103 } 104 105 @BeforeEach 106 public void before() throws Exception { 107 final int balanceCount = MyMasterObserver.postBalanceCount.get(); 108 LOG.info("before"); 109 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 110 // Distribute regions 111 UTIL.getMiniHBaseCluster().getMaster().balance(); 112 // Some plans are created. 113 if (MyMasterObserver.postBalanceCount.get() > balanceCount) { 114 // It is necessary to wait the move procedure to start. 115 // Otherwise, the next wait may pass immediately. 116 UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster() 117 .getAssignmentManager().hasRegionsInTransition()); 118 } 119 120 // Wait until completing balance 121 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); 122 } 123 LOG.info("before done"); 124 } 125 126 private static byte[][] makeKeys() { 127 byte[][] starterKeys = HBaseTestingUtil.KEYS; 128 // Create a "non-uniform" test set with the following characteristics: 129 // a) Unequal number of keys per region 130 131 // Don't use integer as a multiple, so that we have a number of keys that is 132 // not a multiple of the number of regions 133 int numKeys = (int) (starterKeys.length * 10.33F); 134 135 List<byte[]> keys = new ArrayList<>(); 136 for (int i = 0; i < numKeys; i++) { 137 int kIdx = i % starterKeys.length; 138 byte[] k = starterKeys[kIdx]; 139 byte[] cp = new byte[k.length + 1]; 140 System.arraycopy(k, 0, cp, 0, k.length); 141 cp[k.length] = (byte) (i % 256); 142 keys.add(cp); 143 } 144 145 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 146 // should work) 147 // c) keys are not in sorted order (within a region), to ensure that the 148 // sorting code and index mapping doesn't break the functionality 149 for (int i = 0; i < 100; i++) { 150 int kIdx = i % starterKeys.length; 151 byte[] k = starterKeys[kIdx]; 152 byte[] cp = new byte[k.length + 1]; 153 System.arraycopy(k, 0, cp, 0, k.length); 154 cp[k.length] = (byte) (i % 256); 155 keys.add(cp); 156 } 157 return keys.toArray(new byte[][] { new byte[] {} }); 158 } 159 160 @Test 161 public void testBatchWithGet() throws Exception { 162 LOG.info("test=testBatchWithGet"); 163 Table table = UTIL.getConnection().getTable(TEST_TABLE); 164 165 // load test data 166 List<Put> puts = constructPutRequests(); 167 table.batch(puts, null); 168 169 // create a list of gets and run it 170 List<Row> gets = new ArrayList<>(); 171 for (byte[] k : KEYS) { 172 Get get = new Get(k); 173 get.addColumn(BYTES_FAMILY, QUALIFIER); 174 gets.add(get); 175 } 176 Result[] multiRes = new Result[gets.size()]; 177 table.batch(gets, multiRes); 178 179 // Same gets using individual call API 180 List<Result> singleRes = new ArrayList<>(); 181 for (Row get : gets) { 182 singleRes.add(table.get((Get) get)); 183 } 184 // Compare results 185 assertEquals(singleRes.size(), multiRes.length); 186 for (int i = 0; i < singleRes.size(); i++) { 187 assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 188 Cell[] singleKvs = singleRes.get(i).rawCells(); 189 Cell[] multiKvs = multiRes[i].rawCells(); 190 for (int j = 0; j < singleKvs.length; j++) { 191 assertEquals(singleKvs[j], multiKvs[j]); 192 assertEquals(0, 193 Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j]))); 194 } 195 } 196 table.close(); 197 } 198 199 @Test 200 public void testBadFam() throws Exception { 201 LOG.info("test=testBadFam"); 202 Table table = UTIL.getConnection().getTable(TEST_TABLE); 203 204 List<Row> actions = new ArrayList<>(); 205 Put p = new Put(Bytes.toBytes("row1")); 206 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 207 actions.add(p); 208 p = new Put(Bytes.toBytes("row2")); 209 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 210 actions.add(p); 211 212 // row1 and row2 should be in the same region. 213 214 Object[] r = new Object[actions.size()]; 215 try { 216 table.batch(actions, r); 217 fail(); 218 } catch (RetriesExhaustedException ex) { 219 // expected 220 } 221 assertEquals(2, r.length); 222 assertTrue(r[0] instanceof Throwable); 223 assertTrue(r[1] instanceof Result); 224 table.close(); 225 } 226 227 @Test 228 public void testFlushCommitsNoAbort() throws Exception { 229 LOG.info("test=testFlushCommitsNoAbort"); 230 doTestFlushCommits(false); 231 } 232 233 /** 234 * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take 235 * an unnecessarily long time to run. 236 */ 237 @Test 238 public void testFlushCommitsWithAbort() throws Exception { 239 LOG.info("test=testFlushCommitsWithAbort"); 240 doTestFlushCommits(true); 241 } 242 243 /** 244 * Set table auto flush to false and test flushing commits 245 * @param doAbort true if abort one regionserver in the testing 246 */ 247 private void doTestFlushCommits(boolean doAbort) throws Exception { 248 // Load the data 249 LOG.info("get new table"); 250 Table table = UTIL.getConnection().getTable(TEST_TABLE); 251 252 LOG.info("constructPutRequests"); 253 List<Put> puts = constructPutRequests(); 254 table.put(puts); 255 LOG.info("puts"); 256 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 257 assert liveRScount > 0; 258 JVMClusterUtil.RegionServerThread liveRS = 259 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 260 if (doAbort) { 261 liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits")); 262 // If we wait for no regions being online after we abort the server, we 263 // could ensure the master has re-assigned the regions on killed server 264 // after writing successfully. It means the server we aborted is dead 265 // and detected by matser 266 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 267 Thread.sleep(100); 268 } 269 // try putting more keys after the abort. same key/qual... just validating 270 // no exceptions thrown 271 puts = constructPutRequests(); 272 table.put(puts); 273 } 274 275 LOG.info("validating loaded data"); 276 validateLoadedData(table); 277 278 // Validate server and region count 279 List<JVMClusterUtil.RegionServerThread> liveRSs = 280 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 281 int count = 0; 282 for (JVMClusterUtil.RegionServerThread t : liveRSs) { 283 count++; 284 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 285 } 286 LOG.info("Count=" + count); 287 assertEquals((doAbort ? (liveRScount - 1) : liveRScount), count, 288 "Server count=" + count + ", abort=" + doAbort); 289 if (doAbort) { 290 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 291 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 292 @Override 293 public boolean evaluate() throws Exception { 294 // We disable regions on master so the count should be liveRScount - 1 295 return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics() 296 .size() == liveRScount - 1; 297 } 298 }); 299 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 300 } 301 302 table.close(); 303 LOG.info("done"); 304 } 305 306 @Test 307 public void testBatchWithPut() throws Exception { 308 LOG.info("test=testBatchWithPut"); 309 Table table = CONNECTION.getTable(TEST_TABLE); 310 // put multiple rows using a batch 311 List<Put> puts = constructPutRequests(); 312 313 Object[] results = new Object[puts.size()]; 314 table.batch(puts, results); 315 validateSizeAndEmpty(results, KEYS.length); 316 317 if (true) { 318 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 319 assert liveRScount > 0; 320 JVMClusterUtil.RegionServerThread liveRS = 321 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 322 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 323 puts = constructPutRequests(); 324 try { 325 results = new Object[puts.size()]; 326 table.batch(puts, results); 327 } catch (RetriesExhaustedWithDetailsException ree) { 328 LOG.info(ree.getExhaustiveDescription()); 329 table.close(); 330 throw ree; 331 } 332 validateSizeAndEmpty(results, KEYS.length); 333 } 334 335 validateLoadedData(table); 336 table.close(); 337 } 338 339 @Test 340 public void testBatchWithDelete() throws Exception { 341 LOG.info("test=testBatchWithDelete"); 342 Table table = UTIL.getConnection().getTable(TEST_TABLE); 343 344 // Load some data 345 List<Put> puts = constructPutRequests(); 346 Object[] results = new Object[puts.size()]; 347 table.batch(puts, results); 348 validateSizeAndEmpty(results, KEYS.length); 349 350 // Deletes 351 List<Row> deletes = new ArrayList<>(); 352 for (int i = 0; i < KEYS.length; i++) { 353 Delete delete = new Delete(KEYS[i]); 354 delete.addFamily(BYTES_FAMILY); 355 deletes.add(delete); 356 } 357 results = new Object[deletes.size()]; 358 table.batch(deletes, results); 359 validateSizeAndEmpty(results, KEYS.length); 360 361 // Get to make sure ... 362 for (byte[] k : KEYS) { 363 Get get = new Get(k); 364 get.addColumn(BYTES_FAMILY, QUALIFIER); 365 assertFalse(table.exists(get)); 366 } 367 table.close(); 368 } 369 370 @Test 371 public void testHTableDeleteWithList() throws Exception { 372 LOG.info("test=testHTableDeleteWithList"); 373 Table table = UTIL.getConnection().getTable(TEST_TABLE); 374 375 // Load some data 376 List<Put> puts = constructPutRequests(); 377 Object[] results = new Object[puts.size()]; 378 table.batch(puts, results); 379 validateSizeAndEmpty(results, KEYS.length); 380 381 // Deletes 382 ArrayList<Delete> deletes = new ArrayList<>(); 383 for (int i = 0; i < KEYS.length; i++) { 384 Delete delete = new Delete(KEYS[i]); 385 delete.addFamily(BYTES_FAMILY); 386 deletes.add(delete); 387 } 388 table.delete(deletes); 389 390 // Get to make sure ... 391 for (byte[] k : KEYS) { 392 Get get = new Get(k); 393 get.addColumn(BYTES_FAMILY, QUALIFIER); 394 assertFalse(table.exists(get)); 395 } 396 table.close(); 397 } 398 399 @Test 400 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 401 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 402 Table table = UTIL.getConnection().getTable(TEST_TABLE); 403 404 List<Row> puts = new ArrayList<>(); 405 for (int i = 0; i < 100; i++) { 406 Put put = new Put(ONE_ROW); 407 byte[] qual = Bytes.toBytes("column" + i); 408 put.addColumn(BYTES_FAMILY, qual, VALUE); 409 puts.add(put); 410 } 411 Object[] results = new Object[puts.size()]; 412 table.batch(puts, results); 413 414 // validate 415 validateSizeAndEmpty(results, 100); 416 417 // get the data back and validate that it is correct 418 List<Row> gets = new ArrayList<>(); 419 for (int i = 0; i < 100; i++) { 420 Get get = new Get(ONE_ROW); 421 byte[] qual = Bytes.toBytes("column" + i); 422 get.addColumn(BYTES_FAMILY, qual); 423 gets.add(get); 424 } 425 426 Object[] multiRes = new Object[gets.size()]; 427 table.batch(gets, multiRes); 428 429 int idx = 0; 430 for (Object r : multiRes) { 431 byte[] qual = Bytes.toBytes("column" + idx); 432 validateResult(r, qual, VALUE); 433 idx++; 434 } 435 table.close(); 436 } 437 438 @Test 439 public void testBatchWithIncrementAndAppend() throws Exception { 440 LOG.info("test=testBatchWithIncrementAndAppend"); 441 final byte[] QUAL1 = Bytes.toBytes("qual1"); 442 final byte[] QUAL2 = Bytes.toBytes("qual2"); 443 final byte[] QUAL3 = Bytes.toBytes("qual3"); 444 final byte[] QUAL4 = Bytes.toBytes("qual4"); 445 Table table = UTIL.getConnection().getTable(TEST_TABLE); 446 Delete d = new Delete(ONE_ROW); 447 table.delete(d); 448 Put put = new Put(ONE_ROW); 449 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 450 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 451 table.put(put); 452 453 Increment inc = new Increment(ONE_ROW); 454 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 455 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 456 457 Append a = new Append(ONE_ROW); 458 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 459 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 460 List<Row> actions = new ArrayList<>(); 461 actions.add(inc); 462 actions.add(a); 463 464 Object[] multiRes = new Object[actions.size()]; 465 table.batch(actions, multiRes); 466 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 467 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 468 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 469 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 470 table.close(); 471 } 472 473 @Test 474 public void testBatchWithMixedActions() throws Exception { 475 LOG.info("test=testBatchWithMixedActions"); 476 Table table = UTIL.getConnection().getTable(TEST_TABLE); 477 478 // Load some data to start 479 List<Put> puts = constructPutRequests(); 480 Object[] results = new Object[puts.size()]; 481 table.batch(puts, results); 482 validateSizeAndEmpty(results, KEYS.length); 483 484 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 485 // put 486 List<Row> actions = new ArrayList<>(); 487 488 byte[] qual2 = Bytes.toBytes("qual2"); 489 byte[] val2 = Bytes.toBytes("putvalue2"); 490 491 // 0 get 492 Get get = new Get(KEYS[10]); 493 get.addColumn(BYTES_FAMILY, QUALIFIER); 494 actions.add(get); 495 496 // 1 get 497 get = new Get(KEYS[11]); 498 get.addColumn(BYTES_FAMILY, QUALIFIER); 499 actions.add(get); 500 501 // 2 put of new column 502 Put put = new Put(KEYS[10]); 503 put.addColumn(BYTES_FAMILY, qual2, val2); 504 actions.add(put); 505 506 // 3 delete 507 Delete delete = new Delete(KEYS[20]); 508 delete.addFamily(BYTES_FAMILY); 509 actions.add(delete); 510 511 // 4 get 512 get = new Get(KEYS[30]); 513 get.addColumn(BYTES_FAMILY, QUALIFIER); 514 actions.add(get); 515 516 // There used to be a 'get' of a previous put here, but removed 517 // since this API really cannot guarantee order in terms of mixed 518 // get/puts. 519 520 // 5 put of new column 521 put = new Put(KEYS[40]); 522 put.addColumn(BYTES_FAMILY, qual2, val2); 523 actions.add(put); 524 525 // 6 RowMutations 526 RowMutations rm = new RowMutations(KEYS[50]); 527 put = new Put(KEYS[50]); 528 put.addColumn(BYTES_FAMILY, qual2, val2); 529 rm.add((Mutation) put); 530 byte[] qual3 = Bytes.toBytes("qual3"); 531 byte[] val3 = Bytes.toBytes("putvalue3"); 532 put = new Put(KEYS[50]); 533 put.addColumn(BYTES_FAMILY, qual3, val3); 534 rm.add((Mutation) put); 535 actions.add(rm); 536 537 // 7 Add another Get to the mixed sequence after RowMutations 538 get = new Get(KEYS[10]); 539 get.addColumn(BYTES_FAMILY, QUALIFIER); 540 actions.add(get); 541 542 results = new Object[actions.size()]; 543 table.batch(actions, results); 544 545 // Validation 546 547 validateResult(results[0]); 548 validateResult(results[1]); 549 validateEmpty(results[3]); 550 validateResult(results[4]); 551 validateEmpty(results[5]); 552 validateEmpty(results[6]); 553 validateResult(results[7]); 554 555 // validate last put, externally from the batch 556 get = new Get(KEYS[40]); 557 get.addColumn(BYTES_FAMILY, qual2); 558 Result r = table.get(get); 559 validateResult(r, qual2, val2); 560 561 // validate last RowMutations, externally from the batch 562 get = new Get(KEYS[50]); 563 get.addColumn(BYTES_FAMILY, qual2); 564 r = table.get(get); 565 validateResult(r, qual2, val2); 566 567 get = new Get(KEYS[50]); 568 get.addColumn(BYTES_FAMILY, qual3); 569 r = table.get(get); 570 validateResult(r, qual3, val3); 571 572 table.close(); 573 } 574 575 // // Helper methods //// 576 577 private void validateResult(Object r) { 578 validateResult(r, QUALIFIER, VALUE); 579 } 580 581 private void validateResult(Object r1, byte[] qual, byte[] val) { 582 Result r = (Result) r1; 583 assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 584 byte[] value = r.getValue(BYTES_FAMILY, qual); 585 if (0 != Bytes.compareTo(val, value)) { 586 fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value) 587 + "]"); 588 } 589 } 590 591 private List<Put> constructPutRequests() { 592 List<Put> puts = new ArrayList<>(); 593 for (byte[] k : KEYS) { 594 Put put = new Put(k); 595 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE); 596 puts.add(put); 597 } 598 return puts; 599 } 600 601 private void validateLoadedData(Table table) throws IOException { 602 // get the data back and validate that it is correct 603 LOG.info("Validating data on " + table); 604 List<Get> gets = new ArrayList<>(); 605 for (byte[] k : KEYS) { 606 Get get = new Get(k); 607 get.addColumn(BYTES_FAMILY, QUALIFIER); 608 gets.add(get); 609 } 610 int retryNum = 10; 611 Result[] results = null; 612 do { 613 results = table.get(gets); 614 boolean finished = true; 615 for (Result result : results) { 616 if (result.isEmpty()) { 617 finished = false; 618 break; 619 } 620 } 621 if (finished) { 622 break; 623 } 624 try { 625 Thread.sleep(10); 626 } catch (InterruptedException e) { 627 } 628 retryNum--; 629 } while (retryNum > 0); 630 631 if (retryNum == 0) { 632 fail("Timeout for validate data"); 633 } else { 634 if (results != null) { 635 for (Result r : results) { 636 assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 637 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER))); 638 } 639 LOG.info("Validating data on " + table + " successfully!"); 640 } 641 } 642 } 643 644 private void validateEmpty(Object r1) { 645 Result result = (Result) r1; 646 assertTrue(result != null); 647 assertTrue(result.isEmpty()); 648 } 649 650 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 651 // Validate got back the same number of Result objects, all empty 652 assertEquals(expectedSize, results.length); 653 for (Object result : results) { 654 validateEmpty(result); 655 } 656 } 657 658 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { 659 private static final AtomicInteger postBalanceCount = new AtomicInteger(0); 660 private static final AtomicBoolean start = new AtomicBoolean(false); 661 662 @Override 663 public void start(CoprocessorEnvironment env) throws IOException { 664 start.set(true); 665 } 666 667 @Override 668 public Optional<MasterObserver> getMasterObserver() { 669 return Optional.of(this); 670 } 671 672 @Override 673 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx, 674 BalanceRequest request, List<RegionPlan> plans) throws IOException { 675 if (!plans.isEmpty()) { 676 postBalanceCount.incrementAndGet(); 677 } 678 } 679 } 680}