001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.CoprocessorEnvironment;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.codec.KeyValueCodec;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.MasterObserver;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.master.RegionPlan;
051import org.apache.hadoop.hbase.testclassification.FlakeyTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.AfterClass;
057import org.junit.Assert;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066@Category({ MediumTests.class, FlakeyTests.class })
067public class TestMultiParallel {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMultiParallel.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
074
075  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
076  private static final byte[] VALUE = Bytes.toBytes("value");
077  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
078  private static final String FAMILY = "family";
079  private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
080  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
081  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
082  private static final byte[][] KEYS = makeKeys();
083
084  private static final int slaves = 5; // also used for testing HTable pool size
085  private static Connection CONNECTION;
086
087  @BeforeClass
088  public static void beforeClass() throws Exception {
089    // Uncomment the following lines if more verbosity is needed for
090    // debugging (see HBASE-12285 for details).
091    // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
092    // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
093    // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
094    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
095      KeyValueCodec.class.getCanonicalName());
096    // Disable table on master for now as the feature is broken
097    // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
098    // We used to ask for system tables on Master exclusively but not needed by test and doesn't
099    // work anyways -- so commented out.
100    // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
101    UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
102      MyMasterObserver.class.getName());
103    UTIL.startMiniCluster(slaves);
104    Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
105    UTIL.waitTableEnabled(TEST_TABLE);
106    t.close();
107    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
108    assertTrue(MyMasterObserver.start.get());
109  }
110
111  @AfterClass
112  public static void afterClass() throws Exception {
113    CONNECTION.close();
114    UTIL.shutdownMiniCluster();
115  }
116
117  @Before
118  public void before() throws Exception {
119    final int balanceCount = MyMasterObserver.postBalanceCount.get();
120    LOG.info("before");
121    if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
122      // Distribute regions
123      UTIL.getMiniHBaseCluster().getMaster().balance();
124      // Some plans are created.
125      if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
126        // It is necessary to wait the move procedure to start.
127        // Otherwise, the next wait may pass immediately.
128        UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster()
129          .getAssignmentManager().hasRegionsInTransition());
130      }
131
132      // Wait until completing balance
133      UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
134    }
135    LOG.info("before done");
136  }
137
138  private static byte[][] makeKeys() {
139    byte[][] starterKeys = HBaseTestingUtility.KEYS;
140    // Create a "non-uniform" test set with the following characteristics:
141    // a) Unequal number of keys per region
142
143    // Don't use integer as a multiple, so that we have a number of keys that is
144    // not a multiple of the number of regions
145    int numKeys = (int) (starterKeys.length * 10.33F);
146
147    List<byte[]> keys = new ArrayList<>();
148    for (int i = 0; i < numKeys; i++) {
149      int kIdx = i % starterKeys.length;
150      byte[] k = starterKeys[kIdx];
151      byte[] cp = new byte[k.length + 1];
152      System.arraycopy(k, 0, cp, 0, k.length);
153      cp[k.length] = new Integer(i % 256).byteValue();
154      keys.add(cp);
155    }
156
157    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
158    // should work)
159    // c) keys are not in sorted order (within a region), to ensure that the
160    // sorting code and index mapping doesn't break the functionality
161    for (int i = 0; i < 100; i++) {
162      int kIdx = i % starterKeys.length;
163      byte[] k = starterKeys[kIdx];
164      byte[] cp = new byte[k.length + 1];
165      System.arraycopy(k, 0, cp, 0, k.length);
166      cp[k.length] = new Integer(i % 256).byteValue();
167      keys.add(cp);
168    }
169    return keys.toArray(new byte[][] { new byte[] {} });
170  }
171
172  /**
173   * This is for testing the active number of threads that were used while doing a batch operation.
174   * It inserts one row per region via the batch operation, and then checks the number of active
175   * threads.
176   * <p/>
177   * For HBASE-3553
178   */
179  @Test
180  public void testActiveThreadsCount() throws Exception {
181    UTIL.getConfiguration().setLong("hbase.htable.threads.coresize", slaves + 1);
182    // Make sure max is at least as big as coresize; can be smaller in test context where
183    // we tune down thread sizes -- max could be < slaves + 1.
184    UTIL.getConfiguration().setLong("hbase.htable.threads.max", slaves + 1);
185    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
186      ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
187      try {
188        try (Table t = connection.getTable(TEST_TABLE, executor)) {
189          List<Put> puts = constructPutRequests(); // creates a Put for every region
190          t.batch(puts, null);
191          HashSet<ServerName> regionservers = new HashSet<>();
192          try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
193            for (Row r : puts) {
194              HRegionLocation location = locator.getRegionLocation(r.getRow());
195              regionservers.add(location.getServerName());
196            }
197          }
198          assertEquals(regionservers.size(), executor.getLargestPoolSize());
199        }
200      } finally {
201        executor.shutdownNow();
202      }
203    }
204  }
205
206  @Test
207  public void testBatchWithGet() throws Exception {
208    LOG.info("test=testBatchWithGet");
209    Table table = UTIL.getConnection().getTable(TEST_TABLE);
210
211    // load test data
212    List<Put> puts = constructPutRequests();
213    table.batch(puts, null);
214
215    // create a list of gets and run it
216    List<Row> gets = new ArrayList<>();
217    for (byte[] k : KEYS) {
218      Get get = new Get(k);
219      get.addColumn(BYTES_FAMILY, QUALIFIER);
220      gets.add(get);
221    }
222    Result[] multiRes = new Result[gets.size()];
223    table.batch(gets, multiRes);
224
225    // Same gets using individual call API
226    List<Result> singleRes = new ArrayList<>();
227    for (Row get : gets) {
228      singleRes.add(table.get((Get) get));
229    }
230    // Compare results
231    Assert.assertEquals(singleRes.size(), multiRes.length);
232    for (int i = 0; i < singleRes.size(); i++) {
233      Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
234      Cell[] singleKvs = singleRes.get(i).rawCells();
235      Cell[] multiKvs = multiRes[i].rawCells();
236      for (int j = 0; j < singleKvs.length; j++) {
237        Assert.assertEquals(singleKvs[j], multiKvs[j]);
238        Assert.assertEquals(0,
239          Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j])));
240      }
241    }
242    table.close();
243  }
244
245  @Test
246  public void testBadFam() throws Exception {
247    LOG.info("test=testBadFam");
248    Table table = UTIL.getConnection().getTable(TEST_TABLE);
249
250    List<Row> actions = new ArrayList<>();
251    Put p = new Put(Bytes.toBytes("row1"));
252    p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
253    actions.add(p);
254    p = new Put(Bytes.toBytes("row2"));
255    p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
256    actions.add(p);
257
258    // row1 and row2 should be in the same region.
259
260    Object[] r = new Object[actions.size()];
261    try {
262      table.batch(actions, r);
263      fail();
264    } catch (RetriesExhaustedWithDetailsException ex) {
265      LOG.debug(ex.toString(), ex);
266      // good!
267      assertFalse(ex.mayHaveClusterIssues());
268    }
269    assertEquals(2, r.length);
270    assertTrue(r[0] instanceof Throwable);
271    assertTrue(r[1] instanceof Result);
272    table.close();
273  }
274
275  @Test
276  public void testFlushCommitsNoAbort() throws Exception {
277    LOG.info("test=testFlushCommitsNoAbort");
278    doTestFlushCommits(false);
279  }
280
281  /**
282   * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take
283   * an unnecessarily long time to run.
284   */
285  @Test
286  public void testFlushCommitsWithAbort() throws Exception {
287    LOG.info("test=testFlushCommitsWithAbort");
288    doTestFlushCommits(true);
289  }
290
291  /**
292   * Set table auto flush to false and test flushing commits
293   * @param doAbort true if abort one regionserver in the testing
294   */
295  private void doTestFlushCommits(boolean doAbort) throws Exception {
296    // Load the data
297    LOG.info("get new table");
298    Table table = UTIL.getConnection().getTable(TEST_TABLE);
299
300    LOG.info("constructPutRequests");
301    List<Put> puts = constructPutRequests();
302    table.put(puts);
303    LOG.info("puts");
304    final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
305    assert liveRScount > 0;
306    JVMClusterUtil.RegionServerThread liveRS =
307      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
308    if (doAbort) {
309      liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits"));
310      // If we wait for no regions being online after we abort the server, we
311      // could ensure the master has re-assigned the regions on killed server
312      // after writing successfully. It means the server we aborted is dead
313      // and detected by matser
314      while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
315        Thread.sleep(100);
316      }
317      // try putting more keys after the abort. same key/qual... just validating
318      // no exceptions thrown
319      puts = constructPutRequests();
320      table.put(puts);
321    }
322
323    LOG.info("validating loaded data");
324    validateLoadedData(table);
325
326    // Validate server and region count
327    List<JVMClusterUtil.RegionServerThread> liveRSs =
328      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
329    int count = 0;
330    for (JVMClusterUtil.RegionServerThread t : liveRSs) {
331      count++;
332      LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
333    }
334    LOG.info("Count=" + count);
335    Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
336      (doAbort ? (liveRScount - 1) : liveRScount), count);
337    if (doAbort) {
338      UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
339      UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
340        @Override
341        public boolean evaluate() throws Exception {
342          // We disable regions on master so the count should be liveRScount - 1
343          return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics()
344            .size() == liveRScount - 1;
345        }
346      });
347      UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
348    }
349
350    table.close();
351    LOG.info("done");
352  }
353
354  @Test
355  public void testBatchWithPut() throws Exception {
356    LOG.info("test=testBatchWithPut");
357    Table table = CONNECTION.getTable(TEST_TABLE);
358    // put multiple rows using a batch
359    List<Put> puts = constructPutRequests();
360
361    Object[] results = new Object[puts.size()];
362    table.batch(puts, results);
363    validateSizeAndEmpty(results, KEYS.length);
364
365    if (true) {
366      int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
367      assert liveRScount > 0;
368      JVMClusterUtil.RegionServerThread liveRS =
369        UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
370      liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
371      puts = constructPutRequests();
372      try {
373        results = new Object[puts.size()];
374        table.batch(puts, results);
375      } catch (RetriesExhaustedWithDetailsException ree) {
376        LOG.info(ree.getExhaustiveDescription());
377        table.close();
378        throw ree;
379      }
380      validateSizeAndEmpty(results, KEYS.length);
381    }
382
383    validateLoadedData(table);
384    table.close();
385  }
386
387  @Test
388  public void testBatchWithDelete() throws Exception {
389    LOG.info("test=testBatchWithDelete");
390    Table table = UTIL.getConnection().getTable(TEST_TABLE);
391
392    // Load some data
393    List<Put> puts = constructPutRequests();
394    Object[] results = new Object[puts.size()];
395    table.batch(puts, results);
396    validateSizeAndEmpty(results, KEYS.length);
397
398    // Deletes
399    List<Row> deletes = new ArrayList<>();
400    for (int i = 0; i < KEYS.length; i++) {
401      Delete delete = new Delete(KEYS[i]);
402      delete.addFamily(BYTES_FAMILY);
403      deletes.add(delete);
404    }
405    results = new Object[deletes.size()];
406    table.batch(deletes, results);
407    validateSizeAndEmpty(results, KEYS.length);
408
409    // Get to make sure ...
410    for (byte[] k : KEYS) {
411      Get get = new Get(k);
412      get.addColumn(BYTES_FAMILY, QUALIFIER);
413      Assert.assertFalse(table.exists(get));
414    }
415    table.close();
416  }
417
418  @Test
419  public void testHTableDeleteWithList() throws Exception {
420    LOG.info("test=testHTableDeleteWithList");
421    Table table = UTIL.getConnection().getTable(TEST_TABLE);
422
423    // Load some data
424    List<Put> puts = constructPutRequests();
425    Object[] results = new Object[puts.size()];
426    table.batch(puts, results);
427    validateSizeAndEmpty(results, KEYS.length);
428
429    // Deletes
430    ArrayList<Delete> deletes = new ArrayList<>();
431    for (int i = 0; i < KEYS.length; i++) {
432      Delete delete = new Delete(KEYS[i]);
433      delete.addFamily(BYTES_FAMILY);
434      deletes.add(delete);
435    }
436    table.delete(deletes);
437    Assert.assertTrue(deletes.isEmpty());
438
439    // Get to make sure ...
440    for (byte[] k : KEYS) {
441      Get get = new Get(k);
442      get.addColumn(BYTES_FAMILY, QUALIFIER);
443      Assert.assertFalse(table.exists(get));
444    }
445    table.close();
446  }
447
448  @Test
449  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
450    LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
451    Table table = UTIL.getConnection().getTable(TEST_TABLE);
452
453    List<Row> puts = new ArrayList<>();
454    for (int i = 0; i < 100; i++) {
455      Put put = new Put(ONE_ROW);
456      byte[] qual = Bytes.toBytes("column" + i);
457      put.addColumn(BYTES_FAMILY, qual, VALUE);
458      puts.add(put);
459    }
460    Object[] results = new Object[puts.size()];
461    table.batch(puts, results);
462
463    // validate
464    validateSizeAndEmpty(results, 100);
465
466    // get the data back and validate that it is correct
467    List<Row> gets = new ArrayList<>();
468    for (int i = 0; i < 100; i++) {
469      Get get = new Get(ONE_ROW);
470      byte[] qual = Bytes.toBytes("column" + i);
471      get.addColumn(BYTES_FAMILY, qual);
472      gets.add(get);
473    }
474
475    Object[] multiRes = new Object[gets.size()];
476    table.batch(gets, multiRes);
477
478    int idx = 0;
479    for (Object r : multiRes) {
480      byte[] qual = Bytes.toBytes("column" + idx);
481      validateResult(r, qual, VALUE);
482      idx++;
483    }
484    table.close();
485  }
486
487  @Test
488  public void testBatchWithIncrementAndAppend() throws Exception {
489    LOG.info("test=testBatchWithIncrementAndAppend");
490    final byte[] QUAL1 = Bytes.toBytes("qual1");
491    final byte[] QUAL2 = Bytes.toBytes("qual2");
492    final byte[] QUAL3 = Bytes.toBytes("qual3");
493    final byte[] QUAL4 = Bytes.toBytes("qual4");
494    Table table = UTIL.getConnection().getTable(TEST_TABLE);
495    Delete d = new Delete(ONE_ROW);
496    table.delete(d);
497    Put put = new Put(ONE_ROW);
498    put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
499    put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
500    table.put(put);
501
502    Increment inc = new Increment(ONE_ROW);
503    inc.addColumn(BYTES_FAMILY, QUAL2, 1);
504    inc.addColumn(BYTES_FAMILY, QUAL3, 1);
505
506    Append a = new Append(ONE_ROW);
507    a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
508    a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
509    List<Row> actions = new ArrayList<>();
510    actions.add(inc);
511    actions.add(a);
512
513    Object[] multiRes = new Object[actions.size()];
514    table.batch(actions, multiRes);
515    validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
516    validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
517    validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
518    validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
519    table.close();
520  }
521
522  @Test
523  public void testNonceCollision() throws Exception {
524    LOG.info("test=testNonceCollision");
525    final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
526    Table table = connection.getTable(TEST_TABLE);
527    Put put = new Put(ONE_ROW);
528    put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
529
530    // Replace nonce manager with the one that returns each nonce twice.
531    NonceGenerator cnm = new NonceGenerator() {
532
533      private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
534
535      private long lastNonce = -1;
536
537      @Override
538      public synchronized long newNonce() {
539        long nonce = 0;
540        if (lastNonce == -1) {
541          lastNonce = nonce = delegate.newNonce();
542        } else {
543          nonce = lastNonce;
544          lastNonce = -1L;
545        }
546        return nonce;
547      }
548
549      @Override
550      public long getNonceGroup() {
551        return delegate.getNonceGroup();
552      }
553    };
554
555    NonceGenerator oldCnm =
556      ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) connection, cnm);
557
558    // First test sequential requests.
559    try {
560      Increment inc = new Increment(ONE_ROW);
561      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
562      table.increment(inc);
563
564      // duplicate increment
565      inc = new Increment(ONE_ROW);
566      inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
567      Result result = table.increment(inc);
568      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
569
570      Get get = new Get(ONE_ROW);
571      get.addColumn(BYTES_FAMILY, QUALIFIER);
572      result = table.get(get);
573      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
574
575      // Now run a bunch of requests in parallel, exactly half should succeed.
576      int numRequests = 40;
577      final CountDownLatch startedLatch = new CountDownLatch(numRequests);
578      final CountDownLatch startLatch = new CountDownLatch(1);
579      final CountDownLatch doneLatch = new CountDownLatch(numRequests);
580      for (int i = 0; i < numRequests; ++i) {
581        Runnable r = new Runnable() {
582          @Override
583          public void run() {
584            Table table = null;
585            try {
586              table = connection.getTable(TEST_TABLE);
587            } catch (IOException e) {
588              fail("Not expected");
589            }
590            Increment inc = new Increment(ONE_ROW);
591            inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
592            startedLatch.countDown();
593            try {
594              startLatch.await();
595            } catch (InterruptedException e) {
596              fail("Not expected");
597            }
598            try {
599              table.increment(inc);
600            } catch (IOException ioEx) {
601              fail("Not expected");
602            }
603            doneLatch.countDown();
604          }
605        };
606        Threads.setDaemonThreadRunning(new Thread(r));
607      }
608      startedLatch.await(); // Wait until all threads are ready...
609      startLatch.countDown(); // ...and unleash the herd!
610      doneLatch.await();
611      // Now verify
612      get = new Get(ONE_ROW);
613      get.addColumn(BYTES_FAMILY, QUALIFIER);
614      result = table.get(get);
615      validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
616      table.close();
617    } finally {
618      ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection,
619        oldCnm);
620    }
621  }
622
623  @Test
624  public void testBatchWithMixedActions() throws Exception {
625    LOG.info("test=testBatchWithMixedActions");
626    Table table = UTIL.getConnection().getTable(TEST_TABLE);
627
628    // Load some data to start
629    List<Put> puts = constructPutRequests();
630    Object[] results = new Object[puts.size()];
631    table.batch(puts, results);
632    validateSizeAndEmpty(results, KEYS.length);
633
634    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
635    // put
636    List<Row> actions = new ArrayList<>();
637
638    byte[] qual2 = Bytes.toBytes("qual2");
639    byte[] val2 = Bytes.toBytes("putvalue2");
640
641    // 0 get
642    Get get = new Get(KEYS[10]);
643    get.addColumn(BYTES_FAMILY, QUALIFIER);
644    actions.add(get);
645
646    // 1 get
647    get = new Get(KEYS[11]);
648    get.addColumn(BYTES_FAMILY, QUALIFIER);
649    actions.add(get);
650
651    // 2 put of new column
652    Put put = new Put(KEYS[10]);
653    put.addColumn(BYTES_FAMILY, qual2, val2);
654    actions.add(put);
655
656    // 3 delete
657    Delete delete = new Delete(KEYS[20]);
658    delete.addFamily(BYTES_FAMILY);
659    actions.add(delete);
660
661    // 4 get
662    get = new Get(KEYS[30]);
663    get.addColumn(BYTES_FAMILY, QUALIFIER);
664    actions.add(get);
665
666    // There used to be a 'get' of a previous put here, but removed
667    // since this API really cannot guarantee order in terms of mixed
668    // get/puts.
669
670    // 5 put of new column
671    put = new Put(KEYS[40]);
672    put.addColumn(BYTES_FAMILY, qual2, val2);
673    actions.add(put);
674
675    // 6 RowMutations
676    RowMutations rm = new RowMutations(KEYS[50]);
677    put = new Put(KEYS[50]);
678    put.addColumn(BYTES_FAMILY, qual2, val2);
679    rm.add((Mutation) put);
680    byte[] qual3 = Bytes.toBytes("qual3");
681    byte[] val3 = Bytes.toBytes("putvalue3");
682    put = new Put(KEYS[50]);
683    put.addColumn(BYTES_FAMILY, qual3, val3);
684    rm.add((Mutation) put);
685    actions.add(rm);
686
687    // 7 Add another Get to the mixed sequence after RowMutations
688    get = new Get(KEYS[10]);
689    get.addColumn(BYTES_FAMILY, QUALIFIER);
690    actions.add(get);
691
692    results = new Object[actions.size()];
693    table.batch(actions, results);
694
695    // Validation
696
697    validateResult(results[0]);
698    validateResult(results[1]);
699    validateEmpty(results[3]);
700    validateResult(results[4]);
701    validateEmpty(results[5]);
702    validateEmpty(results[6]);
703    validateResult(results[7]);
704
705    // validate last put, externally from the batch
706    get = new Get(KEYS[40]);
707    get.addColumn(BYTES_FAMILY, qual2);
708    Result r = table.get(get);
709    validateResult(r, qual2, val2);
710
711    // validate last RowMutations, externally from the batch
712    get = new Get(KEYS[50]);
713    get.addColumn(BYTES_FAMILY, qual2);
714    r = table.get(get);
715    validateResult(r, qual2, val2);
716
717    get = new Get(KEYS[50]);
718    get.addColumn(BYTES_FAMILY, qual3);
719    r = table.get(get);
720    validateResult(r, qual3, val3);
721
722    table.close();
723  }
724
725  // // Helper methods ////
726
727  private void validateResult(Object r) {
728    validateResult(r, QUALIFIER, VALUE);
729  }
730
731  private void validateResult(Object r1, byte[] qual, byte[] val) {
732    Result r = (Result) r1;
733    Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
734    byte[] value = r.getValue(BYTES_FAMILY, qual);
735    if (0 != Bytes.compareTo(val, value)) {
736      fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value)
737        + "]");
738    }
739  }
740
741  private List<Put> constructPutRequests() {
742    List<Put> puts = new ArrayList<>();
743    for (byte[] k : KEYS) {
744      Put put = new Put(k);
745      put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
746      puts.add(put);
747    }
748    return puts;
749  }
750
751  private void validateLoadedData(Table table) throws IOException {
752    // get the data back and validate that it is correct
753    LOG.info("Validating data on " + table);
754    List<Get> gets = new ArrayList<>();
755    for (byte[] k : KEYS) {
756      Get get = new Get(k);
757      get.addColumn(BYTES_FAMILY, QUALIFIER);
758      gets.add(get);
759    }
760    int retryNum = 10;
761    Result[] results = null;
762    do {
763      results = table.get(gets);
764      boolean finished = true;
765      for (Result result : results) {
766        if (result.isEmpty()) {
767          finished = false;
768          break;
769        }
770      }
771      if (finished) {
772        break;
773      }
774      try {
775        Thread.sleep(10);
776      } catch (InterruptedException e) {
777      }
778      retryNum--;
779    } while (retryNum > 0);
780
781    if (retryNum == 0) {
782      fail("Timeout for validate data");
783    } else {
784      if (results != null) {
785        for (Result r : results) {
786          Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
787          Assert.assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER)));
788        }
789        LOG.info("Validating data on " + table + " successfully!");
790      }
791    }
792  }
793
794  private void validateEmpty(Object r1) {
795    Result result = (Result) r1;
796    Assert.assertTrue(result != null);
797    Assert.assertTrue(result.isEmpty());
798  }
799
800  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
801    // Validate got back the same number of Result objects, all empty
802    Assert.assertEquals(expectedSize, results.length);
803    for (Object result : results) {
804      validateEmpty(result);
805    }
806  }
807
808  public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
809    private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
810    private static final AtomicBoolean start = new AtomicBoolean(false);
811
812    @Override
813    public void start(CoprocessorEnvironment env) throws IOException {
814      start.set(true);
815    }
816
817    @Override
818    public Optional<MasterObserver> getMasterObserver() {
819      return Optional.of(this);
820    }
821
822    @Override
823    public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
824      BalanceRequest request, List<RegionPlan> plans) throws IOException {
825      if (!plans.isEmpty()) {
826        postBalanceCount.incrementAndGet();
827      }
828    }
829  }
830}