001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.CoprocessorEnvironment;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.codec.KeyValueCodec;
039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
040import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.MasterObserver;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.master.RegionPlan;
045import org.apache.hadoop.hbase.testclassification.FlakeyTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.junit.jupiter.api.AfterAll;
050import org.junit.jupiter.api.BeforeAll;
051import org.junit.jupiter.api.BeforeEach;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Tag(MediumTests.TAG)
058@Tag(FlakeyTests.TAG)
059public class TestMultiParallel {
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
062
063  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
064  private static final byte[] VALUE = Bytes.toBytes("value");
065  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
066  private static final String FAMILY = "family";
067  private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
068  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
069  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
070  private static final byte[][] KEYS = makeKeys();
071
072  private static final int slaves = 5; // also used for testing HTable pool size
073  private static Connection CONNECTION;
074
075  @BeforeAll
076  public static void beforeClass() throws Exception {
077    // Uncomment the following lines if more verbosity is needed for
078    // debugging (see HBASE-12285 for details).
079    // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
080    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
081    // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
082    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
083      KeyValueCodec.class.getCanonicalName());
084    // Disable table on master for now as the feature is broken
085    // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
086    // We used to ask for system tables on Master exclusively but not needed by test and doesn't
087    // work anyways -- so commented out.
088    // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
089    UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
090      MyMasterObserver.class.getName());
091    UTIL.startMiniCluster(slaves);
092    Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
093    UTIL.waitTableEnabled(TEST_TABLE);
094    t.close();
095    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
096    assertTrue(MyMasterObserver.start.get());
097  }
098
099  @AfterAll
100  public static void afterClass() throws Exception {
101    CONNECTION.close();
102    UTIL.shutdownMiniCluster();
103  }
104
105  @BeforeEach
106  public void before() throws Exception {
107    final int balanceCount = MyMasterObserver.postBalanceCount.get();
108    LOG.info("before");
109    if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
110      // Distribute regions
111      UTIL.getMiniHBaseCluster().getMaster().balance();
112      // Some plans are created.
113      if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
114        // It is necessary to wait the move procedure to start.
115        // Otherwise, the next wait may pass immediately.
116        UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster()
117          .getAssignmentManager().hasRegionsInTransition());
118      }
119
120      // Wait until completing balance
121      UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
122    }
123    LOG.info("before done");
124  }
125
126  private static byte[][] makeKeys() {
127    byte[][] starterKeys = HBaseTestingUtil.KEYS;
128    // Create a "non-uniform" test set with the following characteristics:
129    // a) Unequal number of keys per region
130
131    // Don't use integer as a multiple, so that we have a number of keys that is
132    // not a multiple of the number of regions
133    int numKeys = (int) (starterKeys.length * 10.33F);
134
135    List<byte[]> keys = new ArrayList<>();
136    for (int i = 0; i < numKeys; i++) {
137      int kIdx = i % starterKeys.length;
138      byte[] k = starterKeys[kIdx];
139      byte[] cp = new byte[k.length + 1];
140      System.arraycopy(k, 0, cp, 0, k.length);
141      cp[k.length] = (byte) (i % 256);
142      keys.add(cp);
143    }
144
145    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
146    // should work)
147    // c) keys are not in sorted order (within a region), to ensure that the
148    // sorting code and index mapping doesn't break the functionality
149    for (int i = 0; i < 100; i++) {
150      int kIdx = i % starterKeys.length;
151      byte[] k = starterKeys[kIdx];
152      byte[] cp = new byte[k.length + 1];
153      System.arraycopy(k, 0, cp, 0, k.length);
154      cp[k.length] = (byte) (i % 256);
155      keys.add(cp);
156    }
157    return keys.toArray(new byte[][] { new byte[] {} });
158  }
159
160  @Test
161  public void testBatchWithGet() throws Exception {
162    LOG.info("test=testBatchWithGet");
163    Table table = UTIL.getConnection().getTable(TEST_TABLE);
164
165    // load test data
166    List<Put> puts = constructPutRequests();
167    table.batch(puts, null);
168
169    // create a list of gets and run it
170    List<Row> gets = new ArrayList<>();
171    for (byte[] k : KEYS) {
172      Get get = new Get(k);
173      get.addColumn(BYTES_FAMILY, QUALIFIER);
174      gets.add(get);
175    }
176    Result[] multiRes = new Result[gets.size()];
177    table.batch(gets, multiRes);
178
179    // Same gets using individual call API
180    List<Result> singleRes = new ArrayList<>();
181    for (Row get : gets) {
182      singleRes.add(table.get((Get) get));
183    }
184    // Compare results
185    assertEquals(singleRes.size(), multiRes.length);
186    for (int i = 0; i < singleRes.size(); i++) {
187      assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
188      Cell[] singleKvs = singleRes.get(i).rawCells();
189      Cell[] multiKvs = multiRes[i].rawCells();
190      for (int j = 0; j < singleKvs.length; j++) {
191        assertEquals(singleKvs[j], multiKvs[j]);
192        assertEquals(0,
193          Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j])));
194      }
195    }
196    table.close();
197  }
198
199  @Test
200  public void testBadFam() throws Exception {
201    LOG.info("test=testBadFam");
202    Table table = UTIL.getConnection().getTable(TEST_TABLE);
203
204    List<Row> actions = new ArrayList<>();
205    Put p = new Put(Bytes.toBytes("row1"));
206    p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
207    actions.add(p);
208    p = new Put(Bytes.toBytes("row2"));
209    p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
210    actions.add(p);
211
212    // row1 and row2 should be in the same region.
213
214    Object[] r = new Object[actions.size()];
215    try {
216      table.batch(actions, r);
217      fail();
218    } catch (RetriesExhaustedException ex) {
219      // expected
220    }
221    assertEquals(2, r.length);
222    assertTrue(r[0] instanceof Throwable);
223    assertTrue(r[1] instanceof Result);
224    table.close();
225  }
226
227  @Test
228  public void testFlushCommitsNoAbort() throws Exception {
229    LOG.info("test=testFlushCommitsNoAbort");
230    doTestFlushCommits(false);
231  }
232
233  /**
234   * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take
235   * an unnecessarily long time to run.
236   */
237  @Test
238  public void testFlushCommitsWithAbort() throws Exception {
239    LOG.info("test=testFlushCommitsWithAbort");
240    doTestFlushCommits(true);
241  }
242
243  /**
244   * Set table auto flush to false and test flushing commits
245   * @param doAbort true if abort one regionserver in the testing
246   */
247  private void doTestFlushCommits(boolean doAbort) throws Exception {
248    // Load the data
249    LOG.info("get new table");
250    Table table = UTIL.getConnection().getTable(TEST_TABLE);
251
252    LOG.info("constructPutRequests");
253    List<Put> puts = constructPutRequests();
254    table.put(puts);
255    LOG.info("puts");
256    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
257    assert liveRScount > 0;
258    JVMClusterUtil.RegionServerThread liveRS =
259      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
260    if (doAbort) {
261      liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits"));
262      // If we wait for no regions being online after we abort the server, we
263      // could ensure the master has re-assigned the regions on killed server
264      // after writing successfully. It means the server we aborted is dead
265      // and detected by matser
266      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
267        Thread.sleep(100);
268      }
269      // try putting more keys after the abort. same key/qual... just validating
270      // no exceptions thrown
271      puts = constructPutRequests();
272      table.put(puts);
273    }
274
275    LOG.info("validating loaded data");
276    validateLoadedData(table);
277
278    // Validate server and region count
279    List<JVMClusterUtil.RegionServerThread> liveRSs =
280      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
281    int count = 0;
282    for (JVMClusterUtil.RegionServerThread t : liveRSs) {
283      count++;
284      LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
285    }
286    LOG.info("Count=" + count);
287    assertEquals((doAbort ? (liveRScount - 1) : liveRScount), count,
288      "Server count=" + count + ", abort=" + doAbort);
289    if (doAbort) {
290      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
291      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
292        @Override
293        public boolean evaluate() throws Exception {
294          // We disable regions on master so the count should be liveRScount - 1
295          return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics()
296            .size() == liveRScount - 1;
297        }
298      });
299      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
300    }
301
302    table.close();
303    LOG.info("done");
304  }
305
306  @Test
307  public void testBatchWithPut() throws Exception {
308    LOG.info("test=testBatchWithPut");
309    Table table = CONNECTION.getTable(TEST_TABLE);
310    // put multiple rows using a batch
311    List<Put> puts = constructPutRequests();
312
313    Object[] results = new Object[puts.size()];
314    table.batch(puts, results);
315    validateSizeAndEmpty(results, KEYS.length);
316
317    if (true) {
318      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
319      assert liveRScount > 0;
320      JVMClusterUtil.RegionServerThread liveRS =
321        UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
322      liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
323      puts = constructPutRequests();
324      try {
325        results = new Object[puts.size()];
326        table.batch(puts, results);
327      } catch (RetriesExhaustedWithDetailsException ree) {
328        LOG.info(ree.getExhaustiveDescription());
329        table.close();
330        throw ree;
331      }
332      validateSizeAndEmpty(results, KEYS.length);
333    }
334
335    validateLoadedData(table);
336    table.close();
337  }
338
339  @Test
340  public void testBatchWithDelete() throws Exception {
341    LOG.info("test=testBatchWithDelete");
342    Table table = UTIL.getConnection().getTable(TEST_TABLE);
343
344    // Load some data
345    List<Put> puts = constructPutRequests();
346    Object[] results = new Object[puts.size()];
347    table.batch(puts, results);
348    validateSizeAndEmpty(results, KEYS.length);
349
350    // Deletes
351    List<Row> deletes = new ArrayList<>();
352    for (int i = 0; i < KEYS.length; i++) {
353      Delete delete = new Delete(KEYS[i]);
354      delete.addFamily(BYTES_FAMILY);
355      deletes.add(delete);
356    }
357    results = new Object[deletes.size()];
358    table.batch(deletes, results);
359    validateSizeAndEmpty(results, KEYS.length);
360
361    // Get to make sure ...
362    for (byte[] k : KEYS) {
363      Get get = new Get(k);
364      get.addColumn(BYTES_FAMILY, QUALIFIER);
365      assertFalse(table.exists(get));
366    }
367    table.close();
368  }
369
370  @Test
371  public void testHTableDeleteWithList() throws Exception {
372    LOG.info("test=testHTableDeleteWithList");
373    Table table = UTIL.getConnection().getTable(TEST_TABLE);
374
375    // Load some data
376    List<Put> puts = constructPutRequests();
377    Object[] results = new Object[puts.size()];
378    table.batch(puts, results);
379    validateSizeAndEmpty(results, KEYS.length);
380
381    // Deletes
382    ArrayList<Delete> deletes = new ArrayList<>();
383    for (int i = 0; i < KEYS.length; i++) {
384      Delete delete = new Delete(KEYS[i]);
385      delete.addFamily(BYTES_FAMILY);
386      deletes.add(delete);
387    }
388    table.delete(deletes);
389
390    // Get to make sure ...
391    for (byte[] k : KEYS) {
392      Get get = new Get(k);
393      get.addColumn(BYTES_FAMILY, QUALIFIER);
394      assertFalse(table.exists(get));
395    }
396    table.close();
397  }
398
399  @Test
400  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
401    LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
402    Table table = UTIL.getConnection().getTable(TEST_TABLE);
403
404    List<Row> puts = new ArrayList<>();
405    for (int i = 0; i < 100; i++) {
406      Put put = new Put(ONE_ROW);
407      byte[] qual = Bytes.toBytes("column" + i);
408      put.addColumn(BYTES_FAMILY, qual, VALUE);
409      puts.add(put);
410    }
411    Object[] results = new Object[puts.size()];
412    table.batch(puts, results);
413
414    // validate
415    validateSizeAndEmpty(results, 100);
416
417    // get the data back and validate that it is correct
418    List<Row> gets = new ArrayList<>();
419    for (int i = 0; i < 100; i++) {
420      Get get = new Get(ONE_ROW);
421      byte[] qual = Bytes.toBytes("column" + i);
422      get.addColumn(BYTES_FAMILY, qual);
423      gets.add(get);
424    }
425
426    Object[] multiRes = new Object[gets.size()];
427    table.batch(gets, multiRes);
428
429    int idx = 0;
430    for (Object r : multiRes) {
431      byte[] qual = Bytes.toBytes("column" + idx);
432      validateResult(r, qual, VALUE);
433      idx++;
434    }
435    table.close();
436  }
437
438  @Test
439  public void testBatchWithIncrementAndAppend() throws Exception {
440    LOG.info("test=testBatchWithIncrementAndAppend");
441    final byte[] QUAL1 = Bytes.toBytes("qual1");
442    final byte[] QUAL2 = Bytes.toBytes("qual2");
443    final byte[] QUAL3 = Bytes.toBytes("qual3");
444    final byte[] QUAL4 = Bytes.toBytes("qual4");
445    Table table = UTIL.getConnection().getTable(TEST_TABLE);
446    Delete d = new Delete(ONE_ROW);
447    table.delete(d);
448    Put put = new Put(ONE_ROW);
449    put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
450    put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
451    table.put(put);
452
453    Increment inc = new Increment(ONE_ROW);
454    inc.addColumn(BYTES_FAMILY, QUAL2, 1);
455    inc.addColumn(BYTES_FAMILY, QUAL3, 1);
456
457    Append a = new Append(ONE_ROW);
458    a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
459    a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
460    List<Row> actions = new ArrayList<>();
461    actions.add(inc);
462    actions.add(a);
463
464    Object[] multiRes = new Object[actions.size()];
465    table.batch(actions, multiRes);
466    validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
467    validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
468    validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
469    validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
470    table.close();
471  }
472
473  @Test
474  public void testBatchWithMixedActions() throws Exception {
475    LOG.info("test=testBatchWithMixedActions");
476    Table table = UTIL.getConnection().getTable(TEST_TABLE);
477
478    // Load some data to start
479    List<Put> puts = constructPutRequests();
480    Object[] results = new Object[puts.size()];
481    table.batch(puts, results);
482    validateSizeAndEmpty(results, KEYS.length);
483
484    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
485    // put
486    List<Row> actions = new ArrayList<>();
487
488    byte[] qual2 = Bytes.toBytes("qual2");
489    byte[] val2 = Bytes.toBytes("putvalue2");
490
491    // 0 get
492    Get get = new Get(KEYS[10]);
493    get.addColumn(BYTES_FAMILY, QUALIFIER);
494    actions.add(get);
495
496    // 1 get
497    get = new Get(KEYS[11]);
498    get.addColumn(BYTES_FAMILY, QUALIFIER);
499    actions.add(get);
500
501    // 2 put of new column
502    Put put = new Put(KEYS[10]);
503    put.addColumn(BYTES_FAMILY, qual2, val2);
504    actions.add(put);
505
506    // 3 delete
507    Delete delete = new Delete(KEYS[20]);
508    delete.addFamily(BYTES_FAMILY);
509    actions.add(delete);
510
511    // 4 get
512    get = new Get(KEYS[30]);
513    get.addColumn(BYTES_FAMILY, QUALIFIER);
514    actions.add(get);
515
516    // There used to be a 'get' of a previous put here, but removed
517    // since this API really cannot guarantee order in terms of mixed
518    // get/puts.
519
520    // 5 put of new column
521    put = new Put(KEYS[40]);
522    put.addColumn(BYTES_FAMILY, qual2, val2);
523    actions.add(put);
524
525    // 6 RowMutations
526    RowMutations rm = new RowMutations(KEYS[50]);
527    put = new Put(KEYS[50]);
528    put.addColumn(BYTES_FAMILY, qual2, val2);
529    rm.add((Mutation) put);
530    byte[] qual3 = Bytes.toBytes("qual3");
531    byte[] val3 = Bytes.toBytes("putvalue3");
532    put = new Put(KEYS[50]);
533    put.addColumn(BYTES_FAMILY, qual3, val3);
534    rm.add((Mutation) put);
535    actions.add(rm);
536
537    // 7 Add another Get to the mixed sequence after RowMutations
538    get = new Get(KEYS[10]);
539    get.addColumn(BYTES_FAMILY, QUALIFIER);
540    actions.add(get);
541
542    results = new Object[actions.size()];
543    table.batch(actions, results);
544
545    // Validation
546
547    validateResult(results[0]);
548    validateResult(results[1]);
549    validateEmpty(results[3]);
550    validateResult(results[4]);
551    validateEmpty(results[5]);
552    validateEmpty(results[6]);
553    validateResult(results[7]);
554
555    // validate last put, externally from the batch
556    get = new Get(KEYS[40]);
557    get.addColumn(BYTES_FAMILY, qual2);
558    Result r = table.get(get);
559    validateResult(r, qual2, val2);
560
561    // validate last RowMutations, externally from the batch
562    get = new Get(KEYS[50]);
563    get.addColumn(BYTES_FAMILY, qual2);
564    r = table.get(get);
565    validateResult(r, qual2, val2);
566
567    get = new Get(KEYS[50]);
568    get.addColumn(BYTES_FAMILY, qual3);
569    r = table.get(get);
570    validateResult(r, qual3, val3);
571
572    table.close();
573  }
574
575  // // Helper methods ////
576
577  private void validateResult(Object r) {
578    validateResult(r, QUALIFIER, VALUE);
579  }
580
581  private void validateResult(Object r1, byte[] qual, byte[] val) {
582    Result r = (Result) r1;
583    assertTrue(r.containsColumn(BYTES_FAMILY, qual));
584    byte[] value = r.getValue(BYTES_FAMILY, qual);
585    if (0 != Bytes.compareTo(val, value)) {
586      fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value)
587        + "]");
588    }
589  }
590
591  private List<Put> constructPutRequests() {
592    List<Put> puts = new ArrayList<>();
593    for (byte[] k : KEYS) {
594      Put put = new Put(k);
595      put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
596      puts.add(put);
597    }
598    return puts;
599  }
600
601  private void validateLoadedData(Table table) throws IOException {
602    // get the data back and validate that it is correct
603    LOG.info("Validating data on " + table);
604    List<Get> gets = new ArrayList<>();
605    for (byte[] k : KEYS) {
606      Get get = new Get(k);
607      get.addColumn(BYTES_FAMILY, QUALIFIER);
608      gets.add(get);
609    }
610    int retryNum = 10;
611    Result[] results = null;
612    do {
613      results = table.get(gets);
614      boolean finished = true;
615      for (Result result : results) {
616        if (result.isEmpty()) {
617          finished = false;
618          break;
619        }
620      }
621      if (finished) {
622        break;
623      }
624      try {
625        Thread.sleep(10);
626      } catch (InterruptedException e) {
627      }
628      retryNum--;
629    } while (retryNum > 0);
630
631    if (retryNum == 0) {
632      fail("Timeout for validate data");
633    } else {
634      if (results != null) {
635        for (Result r : results) {
636          assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
637          assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER)));
638        }
639        LOG.info("Validating data on " + table + " successfully!");
640      }
641    }
642  }
643
644  private void validateEmpty(Object r1) {
645    Result result = (Result) r1;
646    assertTrue(result != null);
647    assertTrue(result.isEmpty());
648  }
649
650  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
651    // Validate got back the same number of Result objects, all empty
652    assertEquals(expectedSize, results.length);
653    for (Object result : results) {
654      validateEmpty(result);
655    }
656  }
657
658  public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
659    private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
660    private static final AtomicBoolean start = new AtomicBoolean(false);
661
662    @Override
663    public void start(CoprocessorEnvironment env) throws IOException {
664      start.set(true);
665    }
666
667    @Override
668    public Optional<MasterObserver> getMasterObserver() {
669      return Optional.of(this);
670    }
671
672    @Override
673    public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
674      BalanceRequest request, List<RegionPlan> plans) throws IOException {
675      if (!plans.isEmpty()) {
676        postBalanceCount.incrementAndGet();
677      }
678    }
679  }
680}