001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CoprocessorEnvironment;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.codec.KeyValueCodec;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.MasterObserver;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.master.RegionPlan;
051import org.apache.hadoop.hbase.testclassification.FlakeyTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.AfterClass;
057import org.junit.Assert;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066@Category({MediumTests.class, FlakeyTests.class})
067public class TestMultiParallel {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestMultiParallel.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
074
075  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
076  private static final byte[] VALUE = Bytes.toBytes("value");
077  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
078  private static final String FAMILY = "family";
079  private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
080  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
081  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
082  private static final byte [][] KEYS = makeKeys();
083
084  private static final int slaves = 5; // also used for testing HTable pool size
085  private static Connection CONNECTION;
086
087  @BeforeClass
088  public static void beforeClass() throws Exception {
089    // Uncomment the following lines if more verbosity is needed for
090    // debugging (see HBASE-12285 for details).
091    //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
092    //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
093    //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
094    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
095        KeyValueCodec.class.getCanonicalName());
096    // Disable table on master for now as the feature is broken
097    //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
098    // We used to ask for system tables on Master exclusively but not needed by test and doesn't
099    // work anyways -- so commented out.
100    // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
101    UTIL.getConfiguration()
102        .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName());
103    UTIL.startMiniCluster(slaves);
104    Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
105    UTIL.waitTableEnabled(TEST_TABLE);
106    t.close();
107    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
108    assertTrue(MyMasterObserver.start.get());
109  }
110
111  @AfterClass
112  public static void afterClass() throws Exception {
113    CONNECTION.close();
114    UTIL.shutdownMiniCluster();
115  }
116
117  @Before
118  public void before() throws Exception {
119    final int balanceCount = MyMasterObserver.postBalanceCount.get();
120    LOG.info("before");
121    if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
122      // Distribute regions
123      UTIL.getMiniHBaseCluster().getMaster().balance();
124      // Some plans are created.
125      if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
126        // It is necessary to wait the move procedure to start.
127        // Otherwise, the next wait may pass immediately.
128        UTIL.waitFor(3 * 1000, 100, false, () ->
129            UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()
130        );
131      }
132
133      // Wait until completing balance
134      UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
135    }
136    LOG.info("before done");
137  }
138
139  private static byte[][] makeKeys() {
140    byte [][] starterKeys = HBaseTestingUtility.KEYS;
141    // Create a "non-uniform" test set with the following characteristics:
142    // a) Unequal number of keys per region
143
144    // Don't use integer as a multiple, so that we have a number of keys that is
145    // not a multiple of the number of regions
146    int numKeys = (int) (starterKeys.length * 10.33F);
147
148    List<byte[]> keys = new ArrayList<>();
149    for (int i = 0; i < numKeys; i++) {
150      int kIdx = i % starterKeys.length;
151      byte[] k = starterKeys[kIdx];
152      byte[] cp = new byte[k.length + 1];
153      System.arraycopy(k, 0, cp, 0, k.length);
154      cp[k.length] = new Integer(i % 256).byteValue();
155      keys.add(cp);
156    }
157
158    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
159    // should work)
160    // c) keys are not in sorted order (within a region), to ensure that the
161    // sorting code and index mapping doesn't break the functionality
162    for (int i = 0; i < 100; i++) {
163      int kIdx = i % starterKeys.length;
164      byte[] k = starterKeys[kIdx];
165      byte[] cp = new byte[k.length + 1];
166      System.arraycopy(k, 0, cp, 0, k.length);
167      cp[k.length] = new Integer(i % 256).byteValue();
168      keys.add(cp);
169    }
170    return keys.toArray(new byte [][] {new byte [] {}});
171  }
172
173
174  /**
175   * This is for testing the active number of threads that were used while
176   * doing a batch operation. It inserts one row per region via the batch
177   * operation, and then checks the number of active threads.
178   * <p/>
179   * For HBASE-3553
180   */
181  @Test
182  public void testActiveThreadsCount() throws Exception {
183    UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1);
184    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
185      ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
186      try {
187        try (Table t = connection.getTable(TEST_TABLE, executor)) {
188          List<Put> puts = constructPutRequests(); // creates a Put for every region
189          t.batch(puts, null);
190          HashSet<ServerName> regionservers = new HashSet<>();
191          try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
192            for (Row r : puts) {
193              HRegionLocation location = locator.getRegionLocation(r.getRow());
194              regionservers.add(location.getServerName());
195            }
196          }
197          assertEquals(regionservers.size(), executor.getLargestPoolSize());
198        }
199      } finally {
200        executor.shutdownNow();
201      }
202    }
203  }
204
205  @Test
206  public void testBatchWithGet() throws Exception {
207    LOG.info("test=testBatchWithGet");
208    Table table = UTIL.getConnection().getTable(TEST_TABLE);
209
210    // load test data
211    List<Put> puts = constructPutRequests();
212    table.batch(puts, null);
213
214    // create a list of gets and run it
215    List<Row> gets = new ArrayList<>();
216    for (byte[] k : KEYS) {
217      Get get = new Get(k);
218      get.addColumn(BYTES_FAMILY, QUALIFIER);
219      gets.add(get);
220    }
221    Result[] multiRes = new Result[gets.size()];
222    table.batch(gets, multiRes);
223
224    // Same gets using individual call API
225    List<Result> singleRes = new ArrayList<>();
226    for (Row get : gets) {
227      singleRes.add(table.get((Get) get));
228    }
229    // Compare results
230    Assert.assertEquals(singleRes.size(), multiRes.length);
231    for (int i = 0; i < singleRes.size(); i++) {
232      Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
233      Cell[] singleKvs = singleRes.get(i).rawCells();
234      Cell[] multiKvs = multiRes[i].rawCells();
235      for (int j = 0; j < singleKvs.length; j++) {
236        Assert.assertEquals(singleKvs[j], multiKvs[j]);
237        Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
238            CellUtil.cloneValue(multiKvs[j])));
239      }
240    }
241    table.close();
242  }
243
244  @Test
245  public void testBadFam() throws Exception {
246    LOG.info("test=testBadFam");
247    Table table = UTIL.getConnection().getTable(TEST_TABLE);
248
249    List<Row> actions = new ArrayList<>();
250    Put p = new Put(Bytes.toBytes("row1"));
251    p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
252    actions.add(p);
253    p = new Put(Bytes.toBytes("row2"));
254    p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
255    actions.add(p);
256
257    // row1 and row2 should be in the same region.
258
259    Object [] r = new Object[actions.size()];
260    try {
261      table.batch(actions, r);
262      fail();
263    } catch (RetriesExhaustedWithDetailsException ex) {
264      LOG.debug(ex.toString(), ex);
265      // good!
266      assertFalse(ex.mayHaveClusterIssues());
267    }
268    assertEquals(2, r.length);
269    assertTrue(r[0] instanceof Throwable);
270    assertTrue(r[1] instanceof Result);
271    table.close();
272  }
273
274  @Test
275  public void testFlushCommitsNoAbort() throws Exception {
276    LOG.info("test=testFlushCommitsNoAbort");
277    doTestFlushCommits(false);
278  }
279
280  /**
281   * Only run one Multi test with a forced RegionServer abort. Otherwise, the
282   * unit tests will take an unnecessarily long time to run.
283   */
284  @Test
285  public void testFlushCommitsWithAbort() throws Exception {
286    LOG.info("test=testFlushCommitsWithAbort");
287    doTestFlushCommits(true);
288  }
289
290  /**
291   * Set table auto flush to false and test flushing commits
292   * @param doAbort true if abort one regionserver in the testing
293   */
294  private void doTestFlushCommits(boolean doAbort) throws Exception {
295    // Load the data
296    LOG.info("get new table");
297    Table table = UTIL.getConnection().getTable(TEST_TABLE);
298
299    LOG.info("constructPutRequests");
300    List<Put> puts = constructPutRequests();
301    table.put(puts);
302    LOG.info("puts");
303    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
304        .size();
305    assert liveRScount > 0;
306    JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
307        .getLiveRegionServerThreads().get(0);
308    if (doAbort) {
309      liveRS.getRegionServer().abort("Aborting for tests",
310          new Exception("doTestFlushCommits"));
311      // If we wait for no regions being online after we abort the server, we
312      // could ensure the master has re-assigned the regions on killed server
313      // after writing successfully. It means the server we aborted is dead
314      // and detected by matser
315      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
316        Thread.sleep(100);
317      }
318      // try putting more keys after the abort. same key/qual... just validating
319      // no exceptions thrown
320      puts = constructPutRequests();
321      table.put(puts);
322    }
323
324    LOG.info("validating loaded data");
325    validateLoadedData(table);
326
327    // Validate server and region count
328    List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
329    int count = 0;
330    for (JVMClusterUtil.RegionServerThread t: liveRSs) {
331      count++;
332      LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
333    }
334    LOG.info("Count=" + count);
335    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
336        (doAbort ? (liveRScount - 1) : liveRScount), count);
337    if (doAbort) {
338      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
339      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
340        @Override
341        public boolean evaluate() throws Exception {
342          // We disable regions on master so the count should be liveRScount - 1
343          return UTIL.getMiniHBaseCluster().getMaster()
344              .getClusterMetrics().getLiveServerMetrics().size() == liveRScount - 1;
345        }
346      });
347      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
348    }
349
350    table.close();
351    LOG.info("done");
352  }
353
354  @Test
355  public void testBatchWithPut() throws Exception {
356    LOG.info("test=testBatchWithPut");
357    Table table = CONNECTION.getTable(TEST_TABLE);
358    // put multiple rows using a batch
359    List<Put> puts = constructPutRequests();
360
361    Object[] results = new Object[puts.size()];
362    table.batch(puts, results);
363    validateSizeAndEmpty(results, KEYS.length);
364
365    if (true) {
366      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
367      assert liveRScount > 0;
368      JVMClusterUtil.RegionServerThread liveRS =
369        UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
370      liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
371      puts = constructPutRequests();
372      try {
373        results = new Object[puts.size()];
374        table.batch(puts, results);
375      } catch (RetriesExhaustedWithDetailsException ree) {
376        LOG.info(ree.getExhaustiveDescription());
377        table.close();
378        throw ree;
379      }
380      validateSizeAndEmpty(results, KEYS.length);
381    }
382
383    validateLoadedData(table);
384    table.close();
385  }
386
387  @Test
388  public void testBatchWithDelete() throws Exception {
389    LOG.info("test=testBatchWithDelete");
390    Table table = UTIL.getConnection().getTable(TEST_TABLE);
391
392    // Load some data
393    List<Put> puts = constructPutRequests();
394    Object[] results = new Object[puts.size()];
395    table.batch(puts, results);
396    validateSizeAndEmpty(results, KEYS.length);
397
398    // Deletes
399    List<Row> deletes = new ArrayList<>();
400    for (int i = 0; i < KEYS.length; i++) {
401      Delete delete = new Delete(KEYS[i]);
402      delete.addFamily(BYTES_FAMILY);
403      deletes.add(delete);
404    }
405    results= new Object[deletes.size()];
406    table.batch(deletes, results);
407    validateSizeAndEmpty(results, KEYS.length);
408
409    // Get to make sure ...
410    for (byte[] k : KEYS) {
411      Get get = new Get(k);
412      get.addColumn(BYTES_FAMILY, QUALIFIER);
413      Assert.assertFalse(table.exists(get));
414    }
415    table.close();
416  }
417
418  @Test
419  public void testHTableDeleteWithList() throws Exception {
420    LOG.info("test=testHTableDeleteWithList");
421    Table table = UTIL.getConnection().getTable(TEST_TABLE);
422
423    // Load some data
424    List<Put> puts = constructPutRequests();
425    Object[] results = new Object[puts.size()];
426    table.batch(puts, results);
427    validateSizeAndEmpty(results, KEYS.length);
428
429    // Deletes
430    ArrayList<Delete> deletes = new ArrayList<>();
431    for (int i = 0; i < KEYS.length; i++) {
432      Delete delete = new Delete(KEYS[i]);
433      delete.addFamily(BYTES_FAMILY);
434      deletes.add(delete);
435    }
436    table.delete(deletes);
437    Assert.assertTrue(deletes.isEmpty());
438
439    // Get to make sure ...
440    for (byte[] k : KEYS) {
441      Get get = new Get(k);
442      get.addColumn(BYTES_FAMILY, QUALIFIER);
443      Assert.assertFalse(table.exists(get));
444    }
445    table.close();
446  }
447
448  @Test
449  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
450    LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
451    Table table = UTIL.getConnection().getTable(TEST_TABLE);
452
453    List<Row> puts = new ArrayList<>();
454    for (int i = 0; i < 100; i++) {
455      Put put = new Put(ONE_ROW);
456      byte[] qual = Bytes.toBytes("column" + i);
457      put.addColumn(BYTES_FAMILY, qual, VALUE);
458      puts.add(put);
459    }
460    Object[] results = new Object[puts.size()];
461    table.batch(puts, results);
462
463    // validate
464    validateSizeAndEmpty(results, 100);
465
466    // get the data back and validate that it is correct
467    List<Row> gets = new ArrayList<>();
468    for (int i = 0; i < 100; i++) {
469      Get get = new Get(ONE_ROW);
470      byte[] qual = Bytes.toBytes("column" + i);
471      get.addColumn(BYTES_FAMILY, qual);
472      gets.add(get);
473    }
474
475    Object[] multiRes = new Object[gets.size()];
476    table.batch(gets, multiRes);
477
478    int idx = 0;
479    for (Object r : multiRes) {
480      byte[] qual = Bytes.toBytes("column" + idx);
481      validateResult(r, qual, VALUE);
482      idx++;
483    }
484    table.close();
485  }
486
487  @Test
488  public void testBatchWithIncrementAndAppend() throws Exception {
489    LOG.info("test=testBatchWithIncrementAndAppend");
490    final byte[] QUAL1 = Bytes.toBytes("qual1");
491    final byte[] QUAL2 = Bytes.toBytes("qual2");
492    final byte[] QUAL3 = Bytes.toBytes("qual3");
493    final byte[] QUAL4 = Bytes.toBytes("qual4");
494    Table table = UTIL.getConnection().getTable(TEST_TABLE);
495    Delete d = new Delete(ONE_ROW);
496    table.delete(d);
497    Put put = new Put(ONE_ROW);
498    put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
499    put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
500    table.put(put);
501
502    Increment inc = new Increment(ONE_ROW);
503    inc.addColumn(BYTES_FAMILY, QUAL2, 1);
504    inc.addColumn(BYTES_FAMILY, QUAL3, 1);
505
506    Append a = new Append(ONE_ROW);
507    a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
508    a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
509    List<Row> actions = new ArrayList<>();
510    actions.add(inc);
511    actions.add(a);
512
513    Object[] multiRes = new Object[actions.size()];
514    table.batch(actions, multiRes);
515    validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
516    validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
517    validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
518    validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
519    table.close();
520  }
521
522  @Test
523  public void testNonceCollision() throws Exception {
524    LOG.info("test=testNonceCollision");
525    final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
526    Table table = connection.getTable(TEST_TABLE);
527    Put put = new Put(ONE_ROW);
528    put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
529
530    // Replace nonce manager with the one that returns each nonce twice.
531    NonceGenerator cnm = new NonceGenerator() {
532
533      private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
534
535      private long lastNonce = -1;
536
537      @Override
538      public synchronized long newNonce() {
539        long nonce = 0;
540        if (lastNonce == -1) {
541          lastNonce = nonce = delegate.newNonce();
542        } else {
543          nonce = lastNonce;
544          lastNonce = -1L;
545        }
546        return nonce;
547      }
548
549      @Override
550      public long getNonceGroup() {
551        return delegate.getNonceGroup();
552      }
553    };
554
555    NonceGenerator oldCnm =
556      ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
557
558    // First test sequential requests.
559    try {
560      Increment inc = new Increment(ONE_ROW);
561      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
562      table.increment(inc);
563
564      // duplicate increment
565      inc = new Increment(ONE_ROW);
566      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
567      Result result = table.increment(inc);
568      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
569
570      Get get = new Get(ONE_ROW);
571      get.addColumn(BYTES_FAMILY, QUALIFIER);
572      result = table.get(get);
573      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
574
575      // Now run a bunch of requests in parallel, exactly half should succeed.
576      int numRequests = 40;
577      final CountDownLatch startedLatch = new CountDownLatch(numRequests);
578      final CountDownLatch startLatch = new CountDownLatch(1);
579      final CountDownLatch doneLatch = new CountDownLatch(numRequests);
580      for (int i = 0; i < numRequests; ++i) {
581        Runnable r = new Runnable() {
582          @Override
583          public void run() {
584            Table table = null;
585            try {
586              table = connection.getTable(TEST_TABLE);
587            } catch (IOException e) {
588              fail("Not expected");
589            }
590            Increment inc = new Increment(ONE_ROW);
591            inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
592            startedLatch.countDown();
593            try {
594              startLatch.await();
595            } catch (InterruptedException e) {
596              fail("Not expected");
597            }
598            try {
599              table.increment(inc);
600            } catch (IOException ioEx) {
601              fail("Not expected");
602            }
603            doneLatch.countDown();
604          }
605        };
606        Threads.setDaemonThreadRunning(new Thread(r));
607      }
608      startedLatch.await(); // Wait until all threads are ready...
609      startLatch.countDown(); // ...and unleash the herd!
610      doneLatch.await();
611      // Now verify
612      get = new Get(ONE_ROW);
613      get.addColumn(BYTES_FAMILY, QUALIFIER);
614      result = table.get(get);
615      validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
616      table.close();
617    } finally {
618      ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm);
619    }
620  }
621
622  @Test
623  public void testBatchWithMixedActions() throws Exception {
624    LOG.info("test=testBatchWithMixedActions");
625    Table table = UTIL.getConnection().getTable(TEST_TABLE);
626
627    // Load some data to start
628    List<Put> puts = constructPutRequests();
629    Object[] results = new Object[puts.size()];
630    table.batch(puts, results);
631    validateSizeAndEmpty(results, KEYS.length);
632
633    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
634    // put
635    List<Row> actions = new ArrayList<>();
636
637    byte[] qual2 = Bytes.toBytes("qual2");
638    byte[] val2 = Bytes.toBytes("putvalue2");
639
640    // 0 get
641    Get get = new Get(KEYS[10]);
642    get.addColumn(BYTES_FAMILY, QUALIFIER);
643    actions.add(get);
644
645    // 1 get
646    get = new Get(KEYS[11]);
647    get.addColumn(BYTES_FAMILY, QUALIFIER);
648    actions.add(get);
649
650    // 2 put of new column
651    Put put = new Put(KEYS[10]);
652    put.addColumn(BYTES_FAMILY, qual2, val2);
653    actions.add(put);
654
655    // 3 delete
656    Delete delete = new Delete(KEYS[20]);
657    delete.addFamily(BYTES_FAMILY);
658    actions.add(delete);
659
660    // 4 get
661    get = new Get(KEYS[30]);
662    get.addColumn(BYTES_FAMILY, QUALIFIER);
663    actions.add(get);
664
665    // There used to be a 'get' of a previous put here, but removed
666    // since this API really cannot guarantee order in terms of mixed
667    // get/puts.
668
669    // 5 put of new column
670    put = new Put(KEYS[40]);
671    put.addColumn(BYTES_FAMILY, qual2, val2);
672    actions.add(put);
673
674    // 6 RowMutations
675    RowMutations rm = new RowMutations(KEYS[50]);
676    put = new Put(KEYS[50]);
677    put.addColumn(BYTES_FAMILY, qual2, val2);
678    rm.add((Mutation) put);
679    byte[] qual3 = Bytes.toBytes("qual3");
680    byte[] val3 = Bytes.toBytes("putvalue3");
681    put = new Put(KEYS[50]);
682    put.addColumn(BYTES_FAMILY, qual3, val3);
683    rm.add((Mutation) put);
684    actions.add(rm);
685
686    // 7 Add another Get to the mixed sequence after RowMutations
687    get = new Get(KEYS[10]);
688    get.addColumn(BYTES_FAMILY, QUALIFIER);
689    actions.add(get);
690
691    results = new Object[actions.size()];
692    table.batch(actions, results);
693
694    // Validation
695
696    validateResult(results[0]);
697    validateResult(results[1]);
698    validateEmpty(results[3]);
699    validateResult(results[4]);
700    validateEmpty(results[5]);
701    validateEmpty(results[6]);
702    validateResult(results[7]);
703
704    // validate last put, externally from the batch
705    get = new Get(KEYS[40]);
706    get.addColumn(BYTES_FAMILY, qual2);
707    Result r = table.get(get);
708    validateResult(r, qual2, val2);
709
710    // validate last RowMutations, externally from the batch
711    get = new Get(KEYS[50]);
712    get.addColumn(BYTES_FAMILY, qual2);
713    r = table.get(get);
714    validateResult(r, qual2, val2);
715
716    get = new Get(KEYS[50]);
717    get.addColumn(BYTES_FAMILY, qual3);
718    r = table.get(get);
719    validateResult(r, qual3, val3);
720
721    table.close();
722  }
723
724  // // Helper methods ////
725
726  private void validateResult(Object r) {
727    validateResult(r, QUALIFIER, VALUE);
728  }
729
730  private void validateResult(Object r1, byte[] qual, byte[] val) {
731    Result r = (Result)r1;
732    Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
733    byte[] value = r.getValue(BYTES_FAMILY, qual);
734    if (0 != Bytes.compareTo(val, value)) {
735      fail("Expected [" + Bytes.toStringBinary(val)
736          + "] but got [" + Bytes.toStringBinary(value) + "]");
737    }
738  }
739
740  private List<Put> constructPutRequests() {
741    List<Put> puts = new ArrayList<>();
742    for (byte[] k : KEYS) {
743      Put put = new Put(k);
744      put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
745      puts.add(put);
746    }
747    return puts;
748  }
749
750  private void validateLoadedData(Table table) throws IOException {
751    // get the data back and validate that it is correct
752    LOG.info("Validating data on " + table);
753    List<Get> gets = new ArrayList<>();
754    for (byte[] k : KEYS) {
755      Get get = new Get(k);
756      get.addColumn(BYTES_FAMILY, QUALIFIER);
757      gets.add(get);
758    }
759    int retryNum = 10;
760    Result[] results = null;
761    do  {
762      results = table.get(gets);
763      boolean finished = true;
764      for (Result result : results) {
765        if (result.isEmpty()) {
766          finished = false;
767          break;
768        }
769      }
770      if (finished) {
771        break;
772      }
773      try {
774        Thread.sleep(10);
775      } catch (InterruptedException e) {
776      }
777      retryNum--;
778    } while (retryNum > 0);
779
780    if (retryNum == 0) {
781      fail("Timeout for validate data");
782    } else {
783      if (results != null) {
784        for (Result r : results) {
785          Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
786          Assert.assertEquals(0, Bytes.compareTo(VALUE, r
787            .getValue(BYTES_FAMILY, QUALIFIER)));
788        }
789        LOG.info("Validating data on " + table + " successfully!");
790      }
791    }
792  }
793
794  private void validateEmpty(Object r1) {
795    Result result = (Result)r1;
796    Assert.assertTrue(result != null);
797    Assert.assertTrue(result.isEmpty());
798  }
799
800  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
801    // Validate got back the same number of Result objects, all empty
802    Assert.assertEquals(expectedSize, results.length);
803    for (Object result : results) {
804      validateEmpty(result);
805    }
806  }
807
808  public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
809    private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
810    private static final AtomicBoolean start = new AtomicBoolean(false);
811
812    @Override
813    public void start(CoprocessorEnvironment env) throws IOException {
814      start.set(true);
815    }
816
817    @Override
818    public Optional<MasterObserver> getMasterObserver() {
819      return Optional.of(this);
820    }
821
822    @Override
823    public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
824        List<RegionPlan> plans) throws IOException {
825      if (!plans.isEmpty()) {
826        postBalanceCount.incrementAndGet();
827      }
828    }
829  }
830}