001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CoprocessorEnvironment;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.codec.KeyValueCodec;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.MasterObserver;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.master.RegionPlan;
051import org.apache.hadoop.hbase.testclassification.FlakeyTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.AfterClass;
057import org.junit.Assert;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066@Category({MediumTests.class, FlakeyTests.class})
067public class TestMultiParallel {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestMultiParallel.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
074
075  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
076  private static final byte[] VALUE = Bytes.toBytes("value");
077  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
078  private static final String FAMILY = "family";
079  private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
080  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
081  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
082  private static final byte [][] KEYS = makeKeys();
083
084  private static final int slaves = 5; // also used for testing HTable pool size
085  private static Connection CONNECTION;
086
087  @BeforeClass
088  public static void beforeClass() throws Exception {
089    // Uncomment the following lines if more verbosity is needed for
090    // debugging (see HBASE-12285 for details).
091    //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
092    //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
093    //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
094    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
095        KeyValueCodec.class.getCanonicalName());
096    // Disable table on master for now as the feature is broken
097    //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
098    // We used to ask for system tables on Master exclusively but not needed by test and doesn't
099    // work anyways -- so commented out.
100    // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
101    UTIL.getConfiguration()
102        .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName());
103    UTIL.startMiniCluster(slaves);
104    Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
105    UTIL.waitTableEnabled(TEST_TABLE);
106    t.close();
107    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
108    assertTrue(MyMasterObserver.start.get());
109  }
110
111  @AfterClass
112  public static void afterClass() throws Exception {
113    CONNECTION.close();
114    UTIL.shutdownMiniCluster();
115  }
116
117  @Before
118  public void before() throws Exception {
119    final int balanceCount = MyMasterObserver.postBalanceCount.get();
120    LOG.info("before");
121    if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
122      // Distribute regions
123      UTIL.getMiniHBaseCluster().getMaster().balance();
124      // Some plans are created.
125      if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
126        // It is necessary to wait the move procedure to start.
127        // Otherwise, the next wait may pass immediately.
128        UTIL.waitFor(3 * 1000, 100, false, () ->
129            UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()
130        );
131      }
132
133      // Wait until completing balance
134      UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
135    }
136    LOG.info("before done");
137  }
138
139  private static byte[][] makeKeys() {
140    byte [][] starterKeys = HBaseTestingUtility.KEYS;
141    // Create a "non-uniform" test set with the following characteristics:
142    // a) Unequal number of keys per region
143
144    // Don't use integer as a multiple, so that we have a number of keys that is
145    // not a multiple of the number of regions
146    int numKeys = (int) (starterKeys.length * 10.33F);
147
148    List<byte[]> keys = new ArrayList<>();
149    for (int i = 0; i < numKeys; i++) {
150      int kIdx = i % starterKeys.length;
151      byte[] k = starterKeys[kIdx];
152      byte[] cp = new byte[k.length + 1];
153      System.arraycopy(k, 0, cp, 0, k.length);
154      cp[k.length] = new Integer(i % 256).byteValue();
155      keys.add(cp);
156    }
157
158    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
159    // should work)
160    // c) keys are not in sorted order (within a region), to ensure that the
161    // sorting code and index mapping doesn't break the functionality
162    for (int i = 0; i < 100; i++) {
163      int kIdx = i % starterKeys.length;
164      byte[] k = starterKeys[kIdx];
165      byte[] cp = new byte[k.length + 1];
166      System.arraycopy(k, 0, cp, 0, k.length);
167      cp[k.length] = new Integer(i % 256).byteValue();
168      keys.add(cp);
169    }
170    return keys.toArray(new byte [][] {new byte [] {}});
171  }
172
173
174  /**
175   * This is for testing the active number of threads that were used while
176   * doing a batch operation. It inserts one row per region via the batch
177   * operation, and then checks the number of active threads.
178   * <p/>
179   * For HBASE-3553
180   */
181  @Test
182  public void testActiveThreadsCount() throws Exception {
183    UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1);
184    // Make sure max is at least as big as coresize; can be smaller in test context where
185    // we tune down thread sizes -- max could be < slaves + 1.
186    UTIL.getConfiguration().setLong("hbase.htable.threads.max", slaves + 1);
187    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
188      ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
189      try {
190        try (Table t = connection.getTable(TEST_TABLE, executor)) {
191          List<Put> puts = constructPutRequests(); // creates a Put for every region
192          t.batch(puts, null);
193          HashSet<ServerName> regionservers = new HashSet<>();
194          try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
195            for (Row r : puts) {
196              HRegionLocation location = locator.getRegionLocation(r.getRow());
197              regionservers.add(location.getServerName());
198            }
199          }
200          assertEquals(regionservers.size(), executor.getLargestPoolSize());
201        }
202      } finally {
203        executor.shutdownNow();
204      }
205    }
206  }
207
208  @Test
209  public void testBatchWithGet() throws Exception {
210    LOG.info("test=testBatchWithGet");
211    Table table = UTIL.getConnection().getTable(TEST_TABLE);
212
213    // load test data
214    List<Put> puts = constructPutRequests();
215    table.batch(puts, null);
216
217    // create a list of gets and run it
218    List<Row> gets = new ArrayList<>();
219    for (byte[] k : KEYS) {
220      Get get = new Get(k);
221      get.addColumn(BYTES_FAMILY, QUALIFIER);
222      gets.add(get);
223    }
224    Result[] multiRes = new Result[gets.size()];
225    table.batch(gets, multiRes);
226
227    // Same gets using individual call API
228    List<Result> singleRes = new ArrayList<>();
229    for (Row get : gets) {
230      singleRes.add(table.get((Get) get));
231    }
232    // Compare results
233    Assert.assertEquals(singleRes.size(), multiRes.length);
234    for (int i = 0; i < singleRes.size(); i++) {
235      Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
236      Cell[] singleKvs = singleRes.get(i).rawCells();
237      Cell[] multiKvs = multiRes[i].rawCells();
238      for (int j = 0; j < singleKvs.length; j++) {
239        Assert.assertEquals(singleKvs[j], multiKvs[j]);
240        Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
241            CellUtil.cloneValue(multiKvs[j])));
242      }
243    }
244    table.close();
245  }
246
247  @Test
248  public void testBadFam() throws Exception {
249    LOG.info("test=testBadFam");
250    Table table = UTIL.getConnection().getTable(TEST_TABLE);
251
252    List<Row> actions = new ArrayList<>();
253    Put p = new Put(Bytes.toBytes("row1"));
254    p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
255    actions.add(p);
256    p = new Put(Bytes.toBytes("row2"));
257    p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
258    actions.add(p);
259
260    // row1 and row2 should be in the same region.
261
262    Object [] r = new Object[actions.size()];
263    try {
264      table.batch(actions, r);
265      fail();
266    } catch (RetriesExhaustedWithDetailsException ex) {
267      LOG.debug(ex.toString(), ex);
268      // good!
269      assertFalse(ex.mayHaveClusterIssues());
270    }
271    assertEquals(2, r.length);
272    assertTrue(r[0] instanceof Throwable);
273    assertTrue(r[1] instanceof Result);
274    table.close();
275  }
276
277  @Test
278  public void testFlushCommitsNoAbort() throws Exception {
279    LOG.info("test=testFlushCommitsNoAbort");
280    doTestFlushCommits(false);
281  }
282
283  /**
284   * Only run one Multi test with a forced RegionServer abort. Otherwise, the
285   * unit tests will take an unnecessarily long time to run.
286   */
287  @Test
288  public void testFlushCommitsWithAbort() throws Exception {
289    LOG.info("test=testFlushCommitsWithAbort");
290    doTestFlushCommits(true);
291  }
292
293  /**
294   * Set table auto flush to false and test flushing commits
295   * @param doAbort true if abort one regionserver in the testing
296   */
297  private void doTestFlushCommits(boolean doAbort) throws Exception {
298    // Load the data
299    LOG.info("get new table");
300    Table table = UTIL.getConnection().getTable(TEST_TABLE);
301
302    LOG.info("constructPutRequests");
303    List<Put> puts = constructPutRequests();
304    table.put(puts);
305    LOG.info("puts");
306    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
307        .size();
308    assert liveRScount > 0;
309    JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
310        .getLiveRegionServerThreads().get(0);
311    if (doAbort) {
312      liveRS.getRegionServer().abort("Aborting for tests",
313          new Exception("doTestFlushCommits"));
314      // If we wait for no regions being online after we abort the server, we
315      // could ensure the master has re-assigned the regions on killed server
316      // after writing successfully. It means the server we aborted is dead
317      // and detected by matser
318      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
319        Thread.sleep(100);
320      }
321      // try putting more keys after the abort. same key/qual... just validating
322      // no exceptions thrown
323      puts = constructPutRequests();
324      table.put(puts);
325    }
326
327    LOG.info("validating loaded data");
328    validateLoadedData(table);
329
330    // Validate server and region count
331    List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
332    int count = 0;
333    for (JVMClusterUtil.RegionServerThread t: liveRSs) {
334      count++;
335      LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
336    }
337    LOG.info("Count=" + count);
338    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
339        (doAbort ? (liveRScount - 1) : liveRScount), count);
340    if (doAbort) {
341      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
342      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
343        @Override
344        public boolean evaluate() throws Exception {
345          // We disable regions on master so the count should be liveRScount - 1
346          return UTIL.getMiniHBaseCluster().getMaster()
347              .getClusterMetrics().getLiveServerMetrics().size() == liveRScount - 1;
348        }
349      });
350      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
351    }
352
353    table.close();
354    LOG.info("done");
355  }
356
357  @Test
358  public void testBatchWithPut() throws Exception {
359    LOG.info("test=testBatchWithPut");
360    Table table = CONNECTION.getTable(TEST_TABLE);
361    // put multiple rows using a batch
362    List<Put> puts = constructPutRequests();
363
364    Object[] results = new Object[puts.size()];
365    table.batch(puts, results);
366    validateSizeAndEmpty(results, KEYS.length);
367
368    if (true) {
369      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
370      assert liveRScount > 0;
371      JVMClusterUtil.RegionServerThread liveRS =
372        UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
373      liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
374      puts = constructPutRequests();
375      try {
376        results = new Object[puts.size()];
377        table.batch(puts, results);
378      } catch (RetriesExhaustedWithDetailsException ree) {
379        LOG.info(ree.getExhaustiveDescription());
380        table.close();
381        throw ree;
382      }
383      validateSizeAndEmpty(results, KEYS.length);
384    }
385
386    validateLoadedData(table);
387    table.close();
388  }
389
390  @Test
391  public void testBatchWithDelete() throws Exception {
392    LOG.info("test=testBatchWithDelete");
393    Table table = UTIL.getConnection().getTable(TEST_TABLE);
394
395    // Load some data
396    List<Put> puts = constructPutRequests();
397    Object[] results = new Object[puts.size()];
398    table.batch(puts, results);
399    validateSizeAndEmpty(results, KEYS.length);
400
401    // Deletes
402    List<Row> deletes = new ArrayList<>();
403    for (int i = 0; i < KEYS.length; i++) {
404      Delete delete = new Delete(KEYS[i]);
405      delete.addFamily(BYTES_FAMILY);
406      deletes.add(delete);
407    }
408    results= new Object[deletes.size()];
409    table.batch(deletes, results);
410    validateSizeAndEmpty(results, KEYS.length);
411
412    // Get to make sure ...
413    for (byte[] k : KEYS) {
414      Get get = new Get(k);
415      get.addColumn(BYTES_FAMILY, QUALIFIER);
416      Assert.assertFalse(table.exists(get));
417    }
418    table.close();
419  }
420
421  @Test
422  public void testHTableDeleteWithList() throws Exception {
423    LOG.info("test=testHTableDeleteWithList");
424    Table table = UTIL.getConnection().getTable(TEST_TABLE);
425
426    // Load some data
427    List<Put> puts = constructPutRequests();
428    Object[] results = new Object[puts.size()];
429    table.batch(puts, results);
430    validateSizeAndEmpty(results, KEYS.length);
431
432    // Deletes
433    ArrayList<Delete> deletes = new ArrayList<>();
434    for (int i = 0; i < KEYS.length; i++) {
435      Delete delete = new Delete(KEYS[i]);
436      delete.addFamily(BYTES_FAMILY);
437      deletes.add(delete);
438    }
439    table.delete(deletes);
440    Assert.assertTrue(deletes.isEmpty());
441
442    // Get to make sure ...
443    for (byte[] k : KEYS) {
444      Get get = new Get(k);
445      get.addColumn(BYTES_FAMILY, QUALIFIER);
446      Assert.assertFalse(table.exists(get));
447    }
448    table.close();
449  }
450
451  @Test
452  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
453    LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
454    Table table = UTIL.getConnection().getTable(TEST_TABLE);
455
456    List<Row> puts = new ArrayList<>();
457    for (int i = 0; i < 100; i++) {
458      Put put = new Put(ONE_ROW);
459      byte[] qual = Bytes.toBytes("column" + i);
460      put.addColumn(BYTES_FAMILY, qual, VALUE);
461      puts.add(put);
462    }
463    Object[] results = new Object[puts.size()];
464    table.batch(puts, results);
465
466    // validate
467    validateSizeAndEmpty(results, 100);
468
469    // get the data back and validate that it is correct
470    List<Row> gets = new ArrayList<>();
471    for (int i = 0; i < 100; i++) {
472      Get get = new Get(ONE_ROW);
473      byte[] qual = Bytes.toBytes("column" + i);
474      get.addColumn(BYTES_FAMILY, qual);
475      gets.add(get);
476    }
477
478    Object[] multiRes = new Object[gets.size()];
479    table.batch(gets, multiRes);
480
481    int idx = 0;
482    for (Object r : multiRes) {
483      byte[] qual = Bytes.toBytes("column" + idx);
484      validateResult(r, qual, VALUE);
485      idx++;
486    }
487    table.close();
488  }
489
490  @Test
491  public void testBatchWithIncrementAndAppend() throws Exception {
492    LOG.info("test=testBatchWithIncrementAndAppend");
493    final byte[] QUAL1 = Bytes.toBytes("qual1");
494    final byte[] QUAL2 = Bytes.toBytes("qual2");
495    final byte[] QUAL3 = Bytes.toBytes("qual3");
496    final byte[] QUAL4 = Bytes.toBytes("qual4");
497    Table table = UTIL.getConnection().getTable(TEST_TABLE);
498    Delete d = new Delete(ONE_ROW);
499    table.delete(d);
500    Put put = new Put(ONE_ROW);
501    put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
502    put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
503    table.put(put);
504
505    Increment inc = new Increment(ONE_ROW);
506    inc.addColumn(BYTES_FAMILY, QUAL2, 1);
507    inc.addColumn(BYTES_FAMILY, QUAL3, 1);
508
509    Append a = new Append(ONE_ROW);
510    a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
511    a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
512    List<Row> actions = new ArrayList<>();
513    actions.add(inc);
514    actions.add(a);
515
516    Object[] multiRes = new Object[actions.size()];
517    table.batch(actions, multiRes);
518    validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
519    validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
520    validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
521    validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
522    table.close();
523  }
524
525  @Test
526  public void testNonceCollision() throws Exception {
527    LOG.info("test=testNonceCollision");
528    final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
529    Table table = connection.getTable(TEST_TABLE);
530    Put put = new Put(ONE_ROW);
531    put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
532
533    // Replace nonce manager with the one that returns each nonce twice.
534    NonceGenerator cnm = new NonceGenerator() {
535
536      private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
537
538      private long lastNonce = -1;
539
540      @Override
541      public synchronized long newNonce() {
542        long nonce = 0;
543        if (lastNonce == -1) {
544          lastNonce = nonce = delegate.newNonce();
545        } else {
546          nonce = lastNonce;
547          lastNonce = -1L;
548        }
549        return nonce;
550      }
551
552      @Override
553      public long getNonceGroup() {
554        return delegate.getNonceGroup();
555      }
556    };
557
558    NonceGenerator oldCnm =
559      ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
560
561    // First test sequential requests.
562    try {
563      Increment inc = new Increment(ONE_ROW);
564      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
565      table.increment(inc);
566
567      // duplicate increment
568      inc = new Increment(ONE_ROW);
569      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
570      Result result = table.increment(inc);
571      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
572
573      Get get = new Get(ONE_ROW);
574      get.addColumn(BYTES_FAMILY, QUALIFIER);
575      result = table.get(get);
576      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
577
578      // Now run a bunch of requests in parallel, exactly half should succeed.
579      int numRequests = 40;
580      final CountDownLatch startedLatch = new CountDownLatch(numRequests);
581      final CountDownLatch startLatch = new CountDownLatch(1);
582      final CountDownLatch doneLatch = new CountDownLatch(numRequests);
583      for (int i = 0; i < numRequests; ++i) {
584        Runnable r = new Runnable() {
585          @Override
586          public void run() {
587            Table table = null;
588            try {
589              table = connection.getTable(TEST_TABLE);
590            } catch (IOException e) {
591              fail("Not expected");
592            }
593            Increment inc = new Increment(ONE_ROW);
594            inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
595            startedLatch.countDown();
596            try {
597              startLatch.await();
598            } catch (InterruptedException e) {
599              fail("Not expected");
600            }
601            try {
602              table.increment(inc);
603            } catch (IOException ioEx) {
604              fail("Not expected");
605            }
606            doneLatch.countDown();
607          }
608        };
609        Threads.setDaemonThreadRunning(new Thread(r));
610      }
611      startedLatch.await(); // Wait until all threads are ready...
612      startLatch.countDown(); // ...and unleash the herd!
613      doneLatch.await();
614      // Now verify
615      get = new Get(ONE_ROW);
616      get.addColumn(BYTES_FAMILY, QUALIFIER);
617      result = table.get(get);
618      validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
619      table.close();
620    } finally {
621      ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm);
622    }
623  }
624
625  @Test
626  public void testBatchWithMixedActions() throws Exception {
627    LOG.info("test=testBatchWithMixedActions");
628    Table table = UTIL.getConnection().getTable(TEST_TABLE);
629
630    // Load some data to start
631    List<Put> puts = constructPutRequests();
632    Object[] results = new Object[puts.size()];
633    table.batch(puts, results);
634    validateSizeAndEmpty(results, KEYS.length);
635
636    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
637    // put
638    List<Row> actions = new ArrayList<>();
639
640    byte[] qual2 = Bytes.toBytes("qual2");
641    byte[] val2 = Bytes.toBytes("putvalue2");
642
643    // 0 get
644    Get get = new Get(KEYS[10]);
645    get.addColumn(BYTES_FAMILY, QUALIFIER);
646    actions.add(get);
647
648    // 1 get
649    get = new Get(KEYS[11]);
650    get.addColumn(BYTES_FAMILY, QUALIFIER);
651    actions.add(get);
652
653    // 2 put of new column
654    Put put = new Put(KEYS[10]);
655    put.addColumn(BYTES_FAMILY, qual2, val2);
656    actions.add(put);
657
658    // 3 delete
659    Delete delete = new Delete(KEYS[20]);
660    delete.addFamily(BYTES_FAMILY);
661    actions.add(delete);
662
663    // 4 get
664    get = new Get(KEYS[30]);
665    get.addColumn(BYTES_FAMILY, QUALIFIER);
666    actions.add(get);
667
668    // There used to be a 'get' of a previous put here, but removed
669    // since this API really cannot guarantee order in terms of mixed
670    // get/puts.
671
672    // 5 put of new column
673    put = new Put(KEYS[40]);
674    put.addColumn(BYTES_FAMILY, qual2, val2);
675    actions.add(put);
676
677    // 6 RowMutations
678    RowMutations rm = new RowMutations(KEYS[50]);
679    put = new Put(KEYS[50]);
680    put.addColumn(BYTES_FAMILY, qual2, val2);
681    rm.add((Mutation) put);
682    byte[] qual3 = Bytes.toBytes("qual3");
683    byte[] val3 = Bytes.toBytes("putvalue3");
684    put = new Put(KEYS[50]);
685    put.addColumn(BYTES_FAMILY, qual3, val3);
686    rm.add((Mutation) put);
687    actions.add(rm);
688
689    // 7 Add another Get to the mixed sequence after RowMutations
690    get = new Get(KEYS[10]);
691    get.addColumn(BYTES_FAMILY, QUALIFIER);
692    actions.add(get);
693
694    results = new Object[actions.size()];
695    table.batch(actions, results);
696
697    // Validation
698
699    validateResult(results[0]);
700    validateResult(results[1]);
701    validateEmpty(results[3]);
702    validateResult(results[4]);
703    validateEmpty(results[5]);
704    validateEmpty(results[6]);
705    validateResult(results[7]);
706
707    // validate last put, externally from the batch
708    get = new Get(KEYS[40]);
709    get.addColumn(BYTES_FAMILY, qual2);
710    Result r = table.get(get);
711    validateResult(r, qual2, val2);
712
713    // validate last RowMutations, externally from the batch
714    get = new Get(KEYS[50]);
715    get.addColumn(BYTES_FAMILY, qual2);
716    r = table.get(get);
717    validateResult(r, qual2, val2);
718
719    get = new Get(KEYS[50]);
720    get.addColumn(BYTES_FAMILY, qual3);
721    r = table.get(get);
722    validateResult(r, qual3, val3);
723
724    table.close();
725  }
726
727  // // Helper methods ////
728
729  private void validateResult(Object r) {
730    validateResult(r, QUALIFIER, VALUE);
731  }
732
733  private void validateResult(Object r1, byte[] qual, byte[] val) {
734    Result r = (Result)r1;
735    Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
736    byte[] value = r.getValue(BYTES_FAMILY, qual);
737    if (0 != Bytes.compareTo(val, value)) {
738      fail("Expected [" + Bytes.toStringBinary(val)
739          + "] but got [" + Bytes.toStringBinary(value) + "]");
740    }
741  }
742
743  private List<Put> constructPutRequests() {
744    List<Put> puts = new ArrayList<>();
745    for (byte[] k : KEYS) {
746      Put put = new Put(k);
747      put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
748      puts.add(put);
749    }
750    return puts;
751  }
752
753  private void validateLoadedData(Table table) throws IOException {
754    // get the data back and validate that it is correct
755    LOG.info("Validating data on " + table);
756    List<Get> gets = new ArrayList<>();
757    for (byte[] k : KEYS) {
758      Get get = new Get(k);
759      get.addColumn(BYTES_FAMILY, QUALIFIER);
760      gets.add(get);
761    }
762    int retryNum = 10;
763    Result[] results = null;
764    do  {
765      results = table.get(gets);
766      boolean finished = true;
767      for (Result result : results) {
768        if (result.isEmpty()) {
769          finished = false;
770          break;
771        }
772      }
773      if (finished) {
774        break;
775      }
776      try {
777        Thread.sleep(10);
778      } catch (InterruptedException e) {
779      }
780      retryNum--;
781    } while (retryNum > 0);
782
783    if (retryNum == 0) {
784      fail("Timeout for validate data");
785    } else {
786      if (results != null) {
787        for (Result r : results) {
788          Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
789          Assert.assertEquals(0, Bytes.compareTo(VALUE, r
790            .getValue(BYTES_FAMILY, QUALIFIER)));
791        }
792        LOG.info("Validating data on " + table + " successfully!");
793      }
794    }
795  }
796
797  private void validateEmpty(Object r1) {
798    Result result = (Result)r1;
799    Assert.assertTrue(result != null);
800    Assert.assertTrue(result.isEmpty());
801  }
802
803  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
804    // Validate got back the same number of Result objects, all empty
805    Assert.assertEquals(expectedSize, results.length);
806    for (Object result : results) {
807      validateEmpty(result);
808    }
809  }
810
811  public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
812    private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
813    private static final AtomicBoolean start = new AtomicBoolean(false);
814
815    @Override
816    public void start(CoprocessorEnvironment env) throws IOException {
817      start.set(true);
818    }
819
820    @Override
821    public Optional<MasterObserver> getMasterObserver() {
822      return Optional.of(this);
823    }
824
825    @Override
826    public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
827        List<RegionPlan> plans) throws IOException {
828      if (!plans.isEmpty()) {
829        postBalanceCount.incrementAndGet();
830      }
831    }
832  }
833}