001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.CoprocessorEnvironment;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.codec.KeyValueCodec;
039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
040import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.MasterObserver;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.master.RegionPlan;
045import org.apache.hadoop.hbase.testclassification.FlakeyTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.junit.AfterClass;
050import org.junit.Assert;
051import org.junit.Before;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({ MediumTests.class, FlakeyTests.class })
060public class TestMultiParallel {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestMultiParallel.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
067
068  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
069  private static final byte[] VALUE = Bytes.toBytes("value");
070  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
071  private static final String FAMILY = "family";
072  private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
073  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
074  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
075  private static final byte[][] KEYS = makeKeys();
076
077  private static final int slaves = 5; // also used for testing HTable pool size
078  private static Connection CONNECTION;
079
080  @BeforeClass
081  public static void beforeClass() throws Exception {
082    // Uncomment the following lines if more verbosity is needed for
083    // debugging (see HBASE-12285 for details).
084    // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
085    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
086    // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
087    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
088      KeyValueCodec.class.getCanonicalName());
089    // Disable table on master for now as the feature is broken
090    // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
091    // We used to ask for system tables on Master exclusively but not needed by test and doesn't
092    // work anyways -- so commented out.
093    // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
094    UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
095      MyMasterObserver.class.getName());
096    UTIL.startMiniCluster(slaves);
097    Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
098    UTIL.waitTableEnabled(TEST_TABLE);
099    t.close();
100    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
101    assertTrue(MyMasterObserver.start.get());
102  }
103
104  @AfterClass
105  public static void afterClass() throws Exception {
106    CONNECTION.close();
107    UTIL.shutdownMiniCluster();
108  }
109
110  @Before
111  public void before() throws Exception {
112    final int balanceCount = MyMasterObserver.postBalanceCount.get();
113    LOG.info("before");
114    if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
115      // Distribute regions
116      UTIL.getMiniHBaseCluster().getMaster().balance();
117      // Some plans are created.
118      if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
119        // It is necessary to wait the move procedure to start.
120        // Otherwise, the next wait may pass immediately.
121        UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster()
122          .getAssignmentManager().hasRegionsInTransition());
123      }
124
125      // Wait until completing balance
126      UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
127    }
128    LOG.info("before done");
129  }
130
131  private static byte[][] makeKeys() {
132    byte[][] starterKeys = HBaseTestingUtil.KEYS;
133    // Create a "non-uniform" test set with the following characteristics:
134    // a) Unequal number of keys per region
135
136    // Don't use integer as a multiple, so that we have a number of keys that is
137    // not a multiple of the number of regions
138    int numKeys = (int) (starterKeys.length * 10.33F);
139
140    List<byte[]> keys = new ArrayList<>();
141    for (int i = 0; i < numKeys; i++) {
142      int kIdx = i % starterKeys.length;
143      byte[] k = starterKeys[kIdx];
144      byte[] cp = new byte[k.length + 1];
145      System.arraycopy(k, 0, cp, 0, k.length);
146      cp[k.length] = new Integer(i % 256).byteValue();
147      keys.add(cp);
148    }
149
150    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
151    // should work)
152    // c) keys are not in sorted order (within a region), to ensure that the
153    // sorting code and index mapping doesn't break the functionality
154    for (int i = 0; i < 100; i++) {
155      int kIdx = i % starterKeys.length;
156      byte[] k = starterKeys[kIdx];
157      byte[] cp = new byte[k.length + 1];
158      System.arraycopy(k, 0, cp, 0, k.length);
159      cp[k.length] = new Integer(i % 256).byteValue();
160      keys.add(cp);
161    }
162    return keys.toArray(new byte[][] { new byte[] {} });
163  }
164
165  @Test
166  public void testBatchWithGet() throws Exception {
167    LOG.info("test=testBatchWithGet");
168    Table table = UTIL.getConnection().getTable(TEST_TABLE);
169
170    // load test data
171    List<Put> puts = constructPutRequests();
172    table.batch(puts, null);
173
174    // create a list of gets and run it
175    List<Row> gets = new ArrayList<>();
176    for (byte[] k : KEYS) {
177      Get get = new Get(k);
178      get.addColumn(BYTES_FAMILY, QUALIFIER);
179      gets.add(get);
180    }
181    Result[] multiRes = new Result[gets.size()];
182    table.batch(gets, multiRes);
183
184    // Same gets using individual call API
185    List<Result> singleRes = new ArrayList<>();
186    for (Row get : gets) {
187      singleRes.add(table.get((Get) get));
188    }
189    // Compare results
190    Assert.assertEquals(singleRes.size(), multiRes.length);
191    for (int i = 0; i < singleRes.size(); i++) {
192      Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
193      Cell[] singleKvs = singleRes.get(i).rawCells();
194      Cell[] multiKvs = multiRes[i].rawCells();
195      for (int j = 0; j < singleKvs.length; j++) {
196        Assert.assertEquals(singleKvs[j], multiKvs[j]);
197        Assert.assertEquals(0,
198          Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j])));
199      }
200    }
201    table.close();
202  }
203
204  @Test
205  public void testBadFam() throws Exception {
206    LOG.info("test=testBadFam");
207    Table table = UTIL.getConnection().getTable(TEST_TABLE);
208
209    List<Row> actions = new ArrayList<>();
210    Put p = new Put(Bytes.toBytes("row1"));
211    p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
212    actions.add(p);
213    p = new Put(Bytes.toBytes("row2"));
214    p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
215    actions.add(p);
216
217    // row1 and row2 should be in the same region.
218
219    Object[] r = new Object[actions.size()];
220    try {
221      table.batch(actions, r);
222      fail();
223    } catch (RetriesExhaustedException ex) {
224      // expected
225    }
226    assertEquals(2, r.length);
227    assertTrue(r[0] instanceof Throwable);
228    assertTrue(r[1] instanceof Result);
229    table.close();
230  }
231
232  @Test
233  public void testFlushCommitsNoAbort() throws Exception {
234    LOG.info("test=testFlushCommitsNoAbort");
235    doTestFlushCommits(false);
236  }
237
238  /**
239   * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take
240   * an unnecessarily long time to run.
241   */
242  @Test
243  public void testFlushCommitsWithAbort() throws Exception {
244    LOG.info("test=testFlushCommitsWithAbort");
245    doTestFlushCommits(true);
246  }
247
248  /**
249   * Set table auto flush to false and test flushing commits
250   * @param doAbort true if abort one regionserver in the testing
251   */
252  private void doTestFlushCommits(boolean doAbort) throws Exception {
253    // Load the data
254    LOG.info("get new table");
255    Table table = UTIL.getConnection().getTable(TEST_TABLE);
256
257    LOG.info("constructPutRequests");
258    List<Put> puts = constructPutRequests();
259    table.put(puts);
260    LOG.info("puts");
261    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
262    assert liveRScount > 0;
263    JVMClusterUtil.RegionServerThread liveRS =
264      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
265    if (doAbort) {
266      liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits"));
267      // If we wait for no regions being online after we abort the server, we
268      // could ensure the master has re-assigned the regions on killed server
269      // after writing successfully. It means the server we aborted is dead
270      // and detected by matser
271      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
272        Thread.sleep(100);
273      }
274      // try putting more keys after the abort. same key/qual... just validating
275      // no exceptions thrown
276      puts = constructPutRequests();
277      table.put(puts);
278    }
279
280    LOG.info("validating loaded data");
281    validateLoadedData(table);
282
283    // Validate server and region count
284    List<JVMClusterUtil.RegionServerThread> liveRSs =
285      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
286    int count = 0;
287    for (JVMClusterUtil.RegionServerThread t : liveRSs) {
288      count++;
289      LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
290    }
291    LOG.info("Count=" + count);
292    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
293      (doAbort ? (liveRScount - 1) : liveRScount), count);
294    if (doAbort) {
295      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
296      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
297        @Override
298        public boolean evaluate() throws Exception {
299          // We disable regions on master so the count should be liveRScount - 1
300          return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics()
301            .size() == liveRScount - 1;
302        }
303      });
304      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
305    }
306
307    table.close();
308    LOG.info("done");
309  }
310
311  @Test
312  public void testBatchWithPut() throws Exception {
313    LOG.info("test=testBatchWithPut");
314    Table table = CONNECTION.getTable(TEST_TABLE);
315    // put multiple rows using a batch
316    List<Put> puts = constructPutRequests();
317
318    Object[] results = new Object[puts.size()];
319    table.batch(puts, results);
320    validateSizeAndEmpty(results, KEYS.length);
321
322    if (true) {
323      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
324      assert liveRScount > 0;
325      JVMClusterUtil.RegionServerThread liveRS =
326        UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
327      liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
328      puts = constructPutRequests();
329      try {
330        results = new Object[puts.size()];
331        table.batch(puts, results);
332      } catch (RetriesExhaustedWithDetailsException ree) {
333        LOG.info(ree.getExhaustiveDescription());
334        table.close();
335        throw ree;
336      }
337      validateSizeAndEmpty(results, KEYS.length);
338    }
339
340    validateLoadedData(table);
341    table.close();
342  }
343
344  @Test
345  public void testBatchWithDelete() throws Exception {
346    LOG.info("test=testBatchWithDelete");
347    Table table = UTIL.getConnection().getTable(TEST_TABLE);
348
349    // Load some data
350    List<Put> puts = constructPutRequests();
351    Object[] results = new Object[puts.size()];
352    table.batch(puts, results);
353    validateSizeAndEmpty(results, KEYS.length);
354
355    // Deletes
356    List<Row> deletes = new ArrayList<>();
357    for (int i = 0; i < KEYS.length; i++) {
358      Delete delete = new Delete(KEYS[i]);
359      delete.addFamily(BYTES_FAMILY);
360      deletes.add(delete);
361    }
362    results = new Object[deletes.size()];
363    table.batch(deletes, results);
364    validateSizeAndEmpty(results, KEYS.length);
365
366    // Get to make sure ...
367    for (byte[] k : KEYS) {
368      Get get = new Get(k);
369      get.addColumn(BYTES_FAMILY, QUALIFIER);
370      Assert.assertFalse(table.exists(get));
371    }
372    table.close();
373  }
374
375  @Test
376  public void testHTableDeleteWithList() throws Exception {
377    LOG.info("test=testHTableDeleteWithList");
378    Table table = UTIL.getConnection().getTable(TEST_TABLE);
379
380    // Load some data
381    List<Put> puts = constructPutRequests();
382    Object[] results = new Object[puts.size()];
383    table.batch(puts, results);
384    validateSizeAndEmpty(results, KEYS.length);
385
386    // Deletes
387    ArrayList<Delete> deletes = new ArrayList<>();
388    for (int i = 0; i < KEYS.length; i++) {
389      Delete delete = new Delete(KEYS[i]);
390      delete.addFamily(BYTES_FAMILY);
391      deletes.add(delete);
392    }
393    table.delete(deletes);
394
395    // Get to make sure ...
396    for (byte[] k : KEYS) {
397      Get get = new Get(k);
398      get.addColumn(BYTES_FAMILY, QUALIFIER);
399      Assert.assertFalse(table.exists(get));
400    }
401    table.close();
402  }
403
404  @Test
405  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
406    LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
407    Table table = UTIL.getConnection().getTable(TEST_TABLE);
408
409    List<Row> puts = new ArrayList<>();
410    for (int i = 0; i < 100; i++) {
411      Put put = new Put(ONE_ROW);
412      byte[] qual = Bytes.toBytes("column" + i);
413      put.addColumn(BYTES_FAMILY, qual, VALUE);
414      puts.add(put);
415    }
416    Object[] results = new Object[puts.size()];
417    table.batch(puts, results);
418
419    // validate
420    validateSizeAndEmpty(results, 100);
421
422    // get the data back and validate that it is correct
423    List<Row> gets = new ArrayList<>();
424    for (int i = 0; i < 100; i++) {
425      Get get = new Get(ONE_ROW);
426      byte[] qual = Bytes.toBytes("column" + i);
427      get.addColumn(BYTES_FAMILY, qual);
428      gets.add(get);
429    }
430
431    Object[] multiRes = new Object[gets.size()];
432    table.batch(gets, multiRes);
433
434    int idx = 0;
435    for (Object r : multiRes) {
436      byte[] qual = Bytes.toBytes("column" + idx);
437      validateResult(r, qual, VALUE);
438      idx++;
439    }
440    table.close();
441  }
442
443  @Test
444  public void testBatchWithIncrementAndAppend() throws Exception {
445    LOG.info("test=testBatchWithIncrementAndAppend");
446    final byte[] QUAL1 = Bytes.toBytes("qual1");
447    final byte[] QUAL2 = Bytes.toBytes("qual2");
448    final byte[] QUAL3 = Bytes.toBytes("qual3");
449    final byte[] QUAL4 = Bytes.toBytes("qual4");
450    Table table = UTIL.getConnection().getTable(TEST_TABLE);
451    Delete d = new Delete(ONE_ROW);
452    table.delete(d);
453    Put put = new Put(ONE_ROW);
454    put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
455    put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
456    table.put(put);
457
458    Increment inc = new Increment(ONE_ROW);
459    inc.addColumn(BYTES_FAMILY, QUAL2, 1);
460    inc.addColumn(BYTES_FAMILY, QUAL3, 1);
461
462    Append a = new Append(ONE_ROW);
463    a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
464    a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
465    List<Row> actions = new ArrayList<>();
466    actions.add(inc);
467    actions.add(a);
468
469    Object[] multiRes = new Object[actions.size()];
470    table.batch(actions, multiRes);
471    validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
472    validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
473    validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
474    validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
475    table.close();
476  }
477
478  @Test
479  public void testBatchWithMixedActions() throws Exception {
480    LOG.info("test=testBatchWithMixedActions");
481    Table table = UTIL.getConnection().getTable(TEST_TABLE);
482
483    // Load some data to start
484    List<Put> puts = constructPutRequests();
485    Object[] results = new Object[puts.size()];
486    table.batch(puts, results);
487    validateSizeAndEmpty(results, KEYS.length);
488
489    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
490    // put
491    List<Row> actions = new ArrayList<>();
492
493    byte[] qual2 = Bytes.toBytes("qual2");
494    byte[] val2 = Bytes.toBytes("putvalue2");
495
496    // 0 get
497    Get get = new Get(KEYS[10]);
498    get.addColumn(BYTES_FAMILY, QUALIFIER);
499    actions.add(get);
500
501    // 1 get
502    get = new Get(KEYS[11]);
503    get.addColumn(BYTES_FAMILY, QUALIFIER);
504    actions.add(get);
505
506    // 2 put of new column
507    Put put = new Put(KEYS[10]);
508    put.addColumn(BYTES_FAMILY, qual2, val2);
509    actions.add(put);
510
511    // 3 delete
512    Delete delete = new Delete(KEYS[20]);
513    delete.addFamily(BYTES_FAMILY);
514    actions.add(delete);
515
516    // 4 get
517    get = new Get(KEYS[30]);
518    get.addColumn(BYTES_FAMILY, QUALIFIER);
519    actions.add(get);
520
521    // There used to be a 'get' of a previous put here, but removed
522    // since this API really cannot guarantee order in terms of mixed
523    // get/puts.
524
525    // 5 put of new column
526    put = new Put(KEYS[40]);
527    put.addColumn(BYTES_FAMILY, qual2, val2);
528    actions.add(put);
529
530    // 6 RowMutations
531    RowMutations rm = new RowMutations(KEYS[50]);
532    put = new Put(KEYS[50]);
533    put.addColumn(BYTES_FAMILY, qual2, val2);
534    rm.add((Mutation) put);
535    byte[] qual3 = Bytes.toBytes("qual3");
536    byte[] val3 = Bytes.toBytes("putvalue3");
537    put = new Put(KEYS[50]);
538    put.addColumn(BYTES_FAMILY, qual3, val3);
539    rm.add((Mutation) put);
540    actions.add(rm);
541
542    // 7 Add another Get to the mixed sequence after RowMutations
543    get = new Get(KEYS[10]);
544    get.addColumn(BYTES_FAMILY, QUALIFIER);
545    actions.add(get);
546
547    results = new Object[actions.size()];
548    table.batch(actions, results);
549
550    // Validation
551
552    validateResult(results[0]);
553    validateResult(results[1]);
554    validateEmpty(results[3]);
555    validateResult(results[4]);
556    validateEmpty(results[5]);
557    validateEmpty(results[6]);
558    validateResult(results[7]);
559
560    // validate last put, externally from the batch
561    get = new Get(KEYS[40]);
562    get.addColumn(BYTES_FAMILY, qual2);
563    Result r = table.get(get);
564    validateResult(r, qual2, val2);
565
566    // validate last RowMutations, externally from the batch
567    get = new Get(KEYS[50]);
568    get.addColumn(BYTES_FAMILY, qual2);
569    r = table.get(get);
570    validateResult(r, qual2, val2);
571
572    get = new Get(KEYS[50]);
573    get.addColumn(BYTES_FAMILY, qual3);
574    r = table.get(get);
575    validateResult(r, qual3, val3);
576
577    table.close();
578  }
579
580  // // Helper methods ////
581
582  private void validateResult(Object r) {
583    validateResult(r, QUALIFIER, VALUE);
584  }
585
586  private void validateResult(Object r1, byte[] qual, byte[] val) {
587    Result r = (Result) r1;
588    Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
589    byte[] value = r.getValue(BYTES_FAMILY, qual);
590    if (0 != Bytes.compareTo(val, value)) {
591      fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value)
592        + "]");
593    }
594  }
595
596  private List<Put> constructPutRequests() {
597    List<Put> puts = new ArrayList<>();
598    for (byte[] k : KEYS) {
599      Put put = new Put(k);
600      put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
601      puts.add(put);
602    }
603    return puts;
604  }
605
606  private void validateLoadedData(Table table) throws IOException {
607    // get the data back and validate that it is correct
608    LOG.info("Validating data on " + table);
609    List<Get> gets = new ArrayList<>();
610    for (byte[] k : KEYS) {
611      Get get = new Get(k);
612      get.addColumn(BYTES_FAMILY, QUALIFIER);
613      gets.add(get);
614    }
615    int retryNum = 10;
616    Result[] results = null;
617    do {
618      results = table.get(gets);
619      boolean finished = true;
620      for (Result result : results) {
621        if (result.isEmpty()) {
622          finished = false;
623          break;
624        }
625      }
626      if (finished) {
627        break;
628      }
629      try {
630        Thread.sleep(10);
631      } catch (InterruptedException e) {
632      }
633      retryNum--;
634    } while (retryNum > 0);
635
636    if (retryNum == 0) {
637      fail("Timeout for validate data");
638    } else {
639      if (results != null) {
640        for (Result r : results) {
641          Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
642          Assert.assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER)));
643        }
644        LOG.info("Validating data on " + table + " successfully!");
645      }
646    }
647  }
648
649  private void validateEmpty(Object r1) {
650    Result result = (Result) r1;
651    Assert.assertTrue(result != null);
652    Assert.assertTrue(result.isEmpty());
653  }
654
655  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
656    // Validate got back the same number of Result objects, all empty
657    Assert.assertEquals(expectedSize, results.length);
658    for (Object result : results) {
659      validateEmpty(result);
660    }
661  }
662
663  public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
664    private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
665    private static final AtomicBoolean start = new AtomicBoolean(false);
666
667    @Override
668    public void start(CoprocessorEnvironment env) throws IOException {
669      start.set(true);
670    }
671
672    @Override
673    public Optional<MasterObserver> getMasterObserver() {
674      return Optional.of(this);
675    }
676
677    @Override
678    public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
679      BalanceRequest request, List<RegionPlan> plans) throws IOException {
680      if (!plans.isEmpty()) {
681        postBalanceCount.incrementAndGet();
682      }
683    }
684  }
685}