001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HColumnDescriptor;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
058import org.apache.hadoop.hbase.regionserver.HRegion;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.Pair;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Assert;
069import org.junit.Before;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
081
082@Category({LargeTests.class, ClientTests.class})
083public class TestFromClientSide3 {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestFromClientSide3.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
090  private final static HBaseTestingUtility TEST_UTIL
091    = new HBaseTestingUtility();
092  private static final int WAITTABLE_MILLIS = 10000;
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static Random random = new Random();
095  private static int SLAVES = 3;
096  private static final byte[] ROW = Bytes.toBytes("testRow");
097  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
098  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
099  private static final byte[] VALUE = Bytes.toBytes("testValue");
100  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
101  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
102  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
103
104  @Rule
105  public TestName name = new TestName();
106  private TableName tableName;
107
108  /**
109   * @throws java.lang.Exception
110   */
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    TEST_UTIL.startMiniCluster(SLAVES);
114  }
115
116  /**
117   * @throws java.lang.Exception
118   */
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  /**
125   * @throws java.lang.Exception
126   */
127  @Before
128  public void setUp() throws Exception {
129    tableName = TableName.valueOf(name.getMethodName());
130  }
131
132  /**
133   * @throws java.lang.Exception
134   */
135  @After
136  public void tearDown() throws Exception {
137    for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) {
138      LOG.info("Tear down, remove table=" + htd.getTableName());
139      TEST_UTIL.deleteTable(htd.getTableName());
140  }
141  }
142
143  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
144      throws Exception {
145    Put put = new Put(row);
146    for (int i = 0; i < nPuts; i++) {
147      byte[] qualifier = Bytes.toBytes(random.nextInt());
148      byte[] value = Bytes.toBytes(random.nextInt());
149      put.addColumn(family, qualifier, value);
150    }
151    table.put(put);
152  }
153
154  private void performMultiplePutAndFlush(HBaseAdmin admin, Table table,
155      byte[] row, byte[] family, int nFlushes, int nPuts)
156  throws Exception {
157
158    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) {
159      // connection needed for poll-wait
160      HRegionLocation loc = locator.getRegionLocation(row, true);
161      AdminProtos.AdminService.BlockingInterface server =
162        ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
163      byte[] regName = loc.getRegionInfo().getRegionName();
164
165      for (int i = 0; i < nFlushes; i++) {
166        randomCFPuts(table, row, family, nPuts);
167        List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
168        int sfCount = sf.size();
169
170        admin.flush(table.getName());
171      }
172    }
173  }
174
175  private static List<Cell> toList(ResultScanner scanner) {
176    try {
177      List<Cell> cells = new ArrayList<>();
178      for (Result r : scanner) {
179        cells.addAll(r.listCells());
180      }
181      return cells;
182    } finally {
183      scanner.close();
184    }
185  }
186
187  @Test
188  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
189    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
190      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
191      byte[] row = Bytes.toBytes("SpecifiedRow");
192      byte[] value0 = Bytes.toBytes("value_0");
193      byte[] value1 = Bytes.toBytes("value_1");
194      Put put = new Put(row);
195      put.addColumn(FAMILY, QUALIFIER, VALUE);
196      table.put(put);
197      Delete d = new Delete(row);
198      table.delete(d);
199      put = new Put(row);
200      put.addColumn(FAMILY, null, value0);
201      table.put(put);
202      put = new Put(row);
203      put.addColumn(FAMILY, null, value1);
204      table.put(put);
205      List<Cell> cells = toList(table.getScanner(new Scan()));
206      assertEquals(1, cells.size());
207      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
208
209      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
210      assertEquals(1, cells.size());
211      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
212
213      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
214      assertEquals(0, cells.size());
215
216      TEST_UTIL.getAdmin().flush(tableName);
217      cells = toList(table.getScanner(new Scan()));
218      assertEquals(1, cells.size());
219      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
220
221      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
222      assertEquals(1, cells.size());
223      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
224
225      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
226      assertEquals(0, cells.size());
227    }
228  }
229
230  @Test
231  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
232    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
233      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
234      byte[] row = Bytes.toBytes("SpecifiedRow");
235      byte[] qual0 = Bytes.toBytes("qual0");
236      byte[] qual1 = Bytes.toBytes("qual1");
237      long now = System.currentTimeMillis();
238      Delete d = new Delete(row, now);
239      table.delete(d);
240
241      Put put = new Put(row);
242      put.addColumn(FAMILY, null, now + 1, VALUE);
243      table.put(put);
244
245      put = new Put(row);
246      put.addColumn(FAMILY, qual1, now + 2, qual1);
247      table.put(put);
248
249      put = new Put(row);
250      put.addColumn(FAMILY, qual0, now + 3, qual0);
251      table.put(put);
252
253      Result r = table.get(new Get(row));
254      assertEquals(r.toString(), 3, r.size());
255      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
256      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
257      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
258
259      TEST_UTIL.getAdmin().flush(tableName);
260      r = table.get(new Get(row));
261      assertEquals(3, r.size());
262      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
263      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
264      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
265    }
266  }
267
268  // override the config settings at the CF level and ensure priority
269  @Test
270  public void testAdvancedConfigOverride() throws Exception {
271    /*
272     * Overall idea: (1) create 3 store files and issue a compaction. config's
273     * compaction.min == 3, so should work. (2) Increase the compaction.min
274     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
275     * of the default config value, adding 3 files and issuing a compaction
276     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
277     * and modify table. The CF schema should override the Table schema and now
278     * cause a minor compaction.
279     */
280    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
281
282    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
283      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
284      try (Admin admin = TEST_UTIL.getAdmin()) {
285        ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
286
287        // Create 3 store files.
288        byte[] row = Bytes.toBytes(random.nextInt());
289        performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100);
290
291        try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
292          // Verify we have multiple store files.
293          HRegionLocation loc = locator.getRegionLocation(row, true);
294          byte[] regionName = loc.getRegionInfo().getRegionName();
295          AdminProtos.AdminService.BlockingInterface server =
296                  connection.getAdmin(loc.getServerName());
297          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
298
299          // Issue a compaction request
300          admin.compact(tableName);
301
302          // poll wait for the compactions to happen
303          for (int i = 0; i < 10 * 1000 / 40; ++i) {
304            // The number of store files after compaction should be lesser.
305            loc = locator.getRegionLocation(row, true);
306            if (!loc.getRegionInfo().isOffline()) {
307              regionName = loc.getRegionInfo().getRegionName();
308              server = connection.getAdmin(loc.getServerName());
309              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) {
310                break;
311              }
312            }
313            Thread.sleep(40);
314          }
315          // verify the compactions took place and that we didn't just time out
316          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1);
317
318          // change the compaction.min config option for this table to 5
319          LOG.info("hbase.hstore.compaction.min should now be 5");
320          HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor());
321          htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
322          admin.modifyTable(tableName, htd);
323          Pair<Integer, Integer> st = admin.getAlterStatus(tableName);
324          while (null != st && st.getFirst() > 0) {
325            LOG.debug(st.getFirst() + " regions left to update");
326            Thread.sleep(40);
327            st = admin.getAlterStatus(tableName);
328          }
329          LOG.info("alter status finished");
330
331          // Create 3 more store files.
332          performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10);
333
334          // Issue a compaction request
335          admin.compact(tableName);
336
337          // This time, the compaction request should not happen
338          Thread.sleep(10 * 1000);
339          loc = locator.getRegionLocation(row, true);
340          regionName = loc.getRegionInfo().getRegionName();
341          server = connection.getAdmin(loc.getServerName());
342          int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size();
343          assertTrue(sfCount > 1);
344
345          // change an individual CF's config option to 2 & online schema update
346          LOG.info("hbase.hstore.compaction.min should now be 2");
347          HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
348          hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
349          htd.modifyFamily(hcd);
350          admin.modifyTable(tableName, htd);
351          st = admin.getAlterStatus(tableName);
352          while (null != st && st.getFirst() > 0) {
353            LOG.debug(st.getFirst() + " regions left to update");
354            Thread.sleep(40);
355            st = admin.getAlterStatus(tableName);
356          }
357          LOG.info("alter status finished");
358
359          // Issue a compaction request
360          admin.compact(tableName);
361
362          // poll wait for the compactions to happen
363          for (int i = 0; i < 10 * 1000 / 40; ++i) {
364            loc = locator.getRegionLocation(row, true);
365            regionName = loc.getRegionInfo().getRegionName();
366            try {
367              server = connection.getAdmin(loc.getServerName());
368              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) {
369                break;
370              }
371            } catch (Exception e) {
372              LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName));
373            }
374            Thread.sleep(40);
375          }
376
377          // verify the compaction took place and that we didn't just time out
378          assertTrue(ProtobufUtil.getStoreFiles(
379                  server, regionName, FAMILY).size() < sfCount);
380
381          // Finally, ensure that we can remove a custom config value after we made it
382          LOG.info("Removing CF config value");
383          LOG.info("hbase.hstore.compaction.min should now be 5");
384          hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
385          hcd.setValue("hbase.hstore.compaction.min", null);
386          htd.modifyFamily(hcd);
387          admin.modifyTable(tableName, htd);
388          st = admin.getAlterStatus(tableName);
389          while (null != st && st.getFirst() > 0) {
390            LOG.debug(st.getFirst() + " regions left to update");
391            Thread.sleep(40);
392            st = admin.getAlterStatus(tableName);
393          }
394          LOG.info("alter status finished");
395          assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue(
396                  "hbase.hstore.compaction.min"));
397        }
398      }
399    }
400  }
401
402  @Test
403  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
404    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
405      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
406      List actions = (List) new ArrayList();
407      Object[] results = new Object[2];
408      // create an empty Put
409      Put put1 = new Put(ROW);
410      actions.add(put1);
411
412      Put put2 = new Put(ANOTHERROW);
413      put2.addColumn(FAMILY, QUALIFIER, VALUE);
414      actions.add(put2);
415
416      table.batch(actions, results);
417      fail("Empty Put should have failed the batch call");
418    } catch (IllegalArgumentException iae) {
419    }
420  }
421
422  // Test Table.batch with large amount of mutations against the same key.
423  // It used to trigger read lock's "Maximum lock count exceeded" Error.
424  @Test
425  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
426    int sixtyFourK = 64 * 1024;
427    List actions = new ArrayList();
428    Object[] results = new Object[(sixtyFourK + 1) * 2];
429
430    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
431      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
432
433      for (int i = 0; i < sixtyFourK + 1; i++) {
434        Put put1 = new Put(ROW);
435        put1.addColumn(FAMILY, QUALIFIER, VALUE);
436        actions.add(put1);
437
438        Put put2 = new Put(ANOTHERROW);
439        put2.addColumn(FAMILY, QUALIFIER, VALUE);
440        actions.add(put2);
441      }
442
443      table.batch(actions, results);
444    }
445  }
446
447  @Test
448  public void testBatchWithRowMutation() throws Exception {
449    LOG.info("Starting testBatchWithRowMutation");
450    byte [][] QUALIFIERS = new byte [][] {
451      Bytes.toBytes("a"), Bytes.toBytes("b")
452    };
453
454    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
455      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
456
457      RowMutations arm = RowMutations.of(Collections.singletonList(
458              new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
459      Object[] batchResult = new Object[1];
460      table.batch(Arrays.asList(arm), batchResult);
461
462      Get g = new Get(ROW);
463      Result r = table.get(g);
464      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
465
466      arm = RowMutations.of(Arrays.asList(
467              new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
468              new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
469      table.batch(Arrays.asList(arm), batchResult);
470      r = table.get(g);
471      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
472      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
473
474      // Test that we get the correct remote exception for RowMutations from batch()
475      try {
476        arm = RowMutations.of(Collections.singletonList(
477                new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
478        table.batch(Arrays.asList(arm), batchResult);
479        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
480      } catch (RetriesExhaustedWithDetailsException e) {
481        String msg = e.getMessage();
482        assertTrue(msg.contains("NoSuchColumnFamilyException"));
483      }
484    }
485  }
486
487  @Test
488  public void testBatchWithCheckAndMutate() throws Exception {
489    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
490      byte[] row1 = Bytes.toBytes("row1");
491      byte[] row2 = Bytes.toBytes("row2");
492      byte[] row3 = Bytes.toBytes("row3");
493      byte[] row4 = Bytes.toBytes("row4");
494      byte[] row5 = Bytes.toBytes("row5");
495      byte[] row6 = Bytes.toBytes("row6");
496      byte[] row7 = Bytes.toBytes("row7");
497
498      table.put(Arrays.asList(
499        new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
500        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
501        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
502        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
503        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
504        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
505        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
506
507      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
508        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
509        .build(new RowMutations(row1)
510          .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
511          .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
512          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
513          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
514      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
515      RowMutations mutations = new RowMutations(row3)
516        .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
517        .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
518        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
519        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
520      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
521        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
522        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
523      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
524      CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
525        .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
526        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
527      CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
528        .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
529        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
530
531      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
532        checkAndMutate3, checkAndMutate4);
533      Object[] results = new Object[actions.size()];
534      table.batch(actions, results);
535
536      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
537      assertTrue(checkAndMutateResult.isSuccess());
538      assertEquals(3L,
539        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
540      assertEquals("d",
541        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
542
543      assertEquals("b",
544        Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
545
546      Result result = (Result) results[2];
547      assertTrue(result.getExists());
548      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
549      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
550
551      checkAndMutateResult = (CheckAndMutateResult) results[3];
552      assertFalse(checkAndMutateResult.isSuccess());
553      assertNull(checkAndMutateResult.getResult());
554
555      assertTrue(((Result) results[4]).isEmpty());
556
557      checkAndMutateResult = (CheckAndMutateResult) results[5];
558      assertTrue(checkAndMutateResult.isSuccess());
559      assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
560        .getValue(FAMILY, Bytes.toBytes("F"))));
561
562      checkAndMutateResult = (CheckAndMutateResult) results[6];
563      assertTrue(checkAndMutateResult.isSuccess());
564      assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
565        .getValue(FAMILY, Bytes.toBytes("G"))));
566
567      result = table.get(new Get(row1));
568      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
569      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
570      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
571      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
572
573      result = table.get(new Get(row3));
574      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
575      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
576      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
577      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
578      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
579
580      result = table.get(new Get(row4));
581      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
582
583      result = table.get(new Get(row5));
584      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
585
586      result = table.get(new Get(row6));
587      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
588
589      result = table.get(new Get(row7));
590      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
591    }
592  }
593
594  @Test
595  public void testHTableExistsMethodSingleRegionSingleGet()
596          throws IOException, InterruptedException {
597    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
598      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
599
600      // Test with a single region table.
601      Put put = new Put(ROW);
602      put.addColumn(FAMILY, QUALIFIER, VALUE);
603
604      Get get = new Get(ROW);
605
606      boolean exist = table.exists(get);
607      assertFalse(exist);
608
609      table.put(put);
610
611      exist = table.exists(get);
612      assertTrue(exist);
613    }
614  }
615
616  @Test
617  public void testHTableExistsMethodSingleRegionMultipleGets()
618          throws IOException, InterruptedException {
619    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
620      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
621
622      Put put = new Put(ROW);
623      put.addColumn(FAMILY, QUALIFIER, VALUE);
624      table.put(put);
625
626      List<Get> gets = new ArrayList<>();
627      gets.add(new Get(ROW));
628      gets.add(new Get(ANOTHERROW));
629
630      boolean[] results = table.exists(gets);
631      assertTrue(results[0]);
632      assertFalse(results[1]);
633    }
634  }
635
636  @Test
637  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
638    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
639      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
640
641      Put put = new Put(ROW);
642      put.addColumn(FAMILY, QUALIFIER, VALUE);
643      table.put(put);
644
645      Get get = new Get(ROW);
646
647      boolean exist = table.exists(get);
648      assertEquals(true, exist);
649
650      Result result = table.get(get);
651      assertEquals(false, result.isEmpty());
652      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
653    }
654  }
655
656  @Test
657  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
658    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
659      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
660
661      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
662      Put put = new Put(ROW);
663      put.addColumn(FAMILY, QUALIFIER, VALUE);
664      table.put(put);
665      put = new Put(ROW2);
666      put.addColumn(FAMILY, QUALIFIER, VALUE);
667      table.put(put);
668
669      Get get = new Get(ROW);
670      Get get2 = new Get(ROW2);
671      ArrayList<Get> getList = new ArrayList(2);
672      getList.add(get);
673      getList.add(get2);
674
675      boolean[] exists = table.existsAll(getList);
676      assertEquals(true, exists[0]);
677      assertEquals(true, exists[1]);
678
679      Result[] result = table.get(getList);
680      assertEquals(false, result[0].isEmpty());
681      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
682      assertEquals(false, result[1].isEmpty());
683      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
684    }
685  }
686
687  @Test
688  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
689    try (Table table = TEST_UTIL.createTable(
690      tableName, new byte[][] { FAMILY },
691      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
692      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
693
694      Put put = new Put(ROW);
695      put.addColumn(FAMILY, QUALIFIER, VALUE);
696
697      Get get = new Get(ROW);
698
699      boolean exist = table.exists(get);
700      assertFalse(exist);
701
702      table.put(put);
703
704      exist = table.exists(get);
705      assertTrue(exist);
706    }
707  }
708
709  @Test
710  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
711    try (Table table = TEST_UTIL.createTable(
712      tableName,
713      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
714      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
715
716      Put put = new Put(ROW);
717      put.addColumn(FAMILY, QUALIFIER, VALUE);
718      table.put(put);
719
720      List<Get> gets = new ArrayList<>();
721      gets.add(new Get(ANOTHERROW));
722      gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
723      gets.add(new Get(ROW));
724      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
725
726      LOG.info("Calling exists");
727      boolean[] results = table.existsAll(gets);
728      assertFalse(results[0]);
729      assertFalse(results[1]);
730      assertTrue(results[2]);
731      assertFalse(results[3]);
732
733      // Test with the first region.
734      put = new Put(new byte[]{0x00});
735      put.addColumn(FAMILY, QUALIFIER, VALUE);
736      table.put(put);
737
738      gets = new ArrayList<>();
739      gets.add(new Get(new byte[]{0x00}));
740      gets.add(new Get(new byte[]{0x00, 0x00}));
741      results = table.existsAll(gets);
742      assertTrue(results[0]);
743      assertFalse(results[1]);
744
745      // Test with the last region
746      put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
747      put.addColumn(FAMILY, QUALIFIER, VALUE);
748      table.put(put);
749
750      gets = new ArrayList<>();
751      gets.add(new Get(new byte[]{(byte) 0xff}));
752      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
753      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
754      results = table.existsAll(gets);
755      assertFalse(results[0]);
756      assertTrue(results[1]);
757      assertFalse(results[2]);
758    }
759  }
760
761  @Test
762  public void testGetEmptyRow() throws Exception {
763    //Create a table and put in 1 row
764    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
765      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
766
767      Put put = new Put(ROW_BYTES);
768      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
769      table.put(put);
770
771      //Try getting the row with an empty row key
772      Result res = null;
773      try {
774        res = table.get(new Get(new byte[0]));
775        fail();
776      } catch (IllegalArgumentException e) {
777        // Expected.
778      }
779      assertTrue(res == null);
780      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
781      assertTrue(res.isEmpty() == true);
782      res = table.get(new Get(ROW_BYTES));
783      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
784    }
785  }
786
787  @Test
788  public void testConnectionDefaultUsesCodec() throws Exception {
789    ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
790    assertTrue(con.hasCellBlockSupport());
791  }
792
793  @Test
794  public void testPutWithPreBatchMutate() throws Exception {
795    testPreBatchMutate(tableName, () -> {
796      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
797        Put put = new Put(ROW);
798        put.addColumn(FAMILY, QUALIFIER, VALUE);
799        t.put(put);
800      } catch (IOException ex) {
801        throw new RuntimeException(ex);
802      }
803    });
804  }
805
806  @Test
807  public void testRowMutationsWithPreBatchMutate() throws Exception {
808    testPreBatchMutate(tableName, () -> {
809      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
810        RowMutations rm = new RowMutations(ROW, 1);
811        Put put = new Put(ROW);
812        put.addColumn(FAMILY, QUALIFIER, VALUE);
813        rm.add(put);
814        t.mutateRow(rm);
815      } catch (IOException ex) {
816        throw new RuntimeException(ex);
817      }
818    });
819  }
820
821  private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
822    HTableDescriptor desc = new HTableDescriptor(tableName);
823    desc.addCoprocessor(WaitingForScanObserver.class.getName());
824    desc.addFamily(new HColumnDescriptor(FAMILY));
825    TEST_UTIL.getAdmin().createTable(desc);
826    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
827
828    ExecutorService service = Executors.newFixedThreadPool(2);
829    service.execute(rn);
830    final List<Cell> cells = new ArrayList<>();
831    service.execute(() -> {
832      try {
833        // waiting for update.
834        TimeUnit.SECONDS.sleep(3);
835        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
836          Scan scan = new Scan();
837          try (ResultScanner scanner = t.getScanner(scan)) {
838            for (Result r : scanner) {
839              cells.addAll(Arrays.asList(r.rawCells()));
840            }
841          }
842        }
843      } catch (IOException | InterruptedException ex) {
844        throw new RuntimeException(ex);
845      }
846    });
847    service.shutdown();
848    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
849    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
850            + ", so the data is invisible to reader", 0, cells.size());
851    TEST_UTIL.deleteTable(tableName);
852  }
853
854  @Test
855  public void testLockLeakWithDelta() throws Exception, Throwable {
856    HTableDescriptor desc = new HTableDescriptor(tableName);
857    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
858    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
859    desc.addFamily(new HColumnDescriptor(FAMILY));
860    TEST_UTIL.getAdmin().createTable(desc);
861    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
862
863    // new a connection for lower retry number.
864    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
865    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
866    try (Connection con = ConnectionFactory.createConnection(copy)) {
867      HRegion region = (HRegion) find(tableName);
868      region.setTimeoutForWriteLock(10);
869      ExecutorService putService = Executors.newSingleThreadExecutor();
870      putService.execute(() -> {
871        try (Table table = con.getTable(tableName)) {
872          Put put = new Put(ROW);
873          put.addColumn(FAMILY, QUALIFIER, VALUE);
874          // the put will be blocked by WaitingForMultiMutationsObserver.
875          table.put(put);
876        } catch (IOException ex) {
877          throw new RuntimeException(ex);
878        }
879      });
880      ExecutorService appendService = Executors.newSingleThreadExecutor();
881      appendService.execute(() -> {
882        Append append = new Append(ROW);
883        append.addColumn(FAMILY, QUALIFIER, VALUE);
884        try (Table table = con.getTable(tableName)) {
885          table.append(append);
886          fail("The APPEND should fail because the target lock is blocked by previous put");
887        } catch (Exception ex) {
888        }
889      });
890      appendService.shutdown();
891      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
892      WaitingForMultiMutationsObserver observer =
893              find(tableName, WaitingForMultiMutationsObserver.class);
894      observer.latch.countDown();
895      putService.shutdown();
896      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
897      try (Table table = con.getTable(tableName)) {
898        Result r = table.get(new Get(ROW));
899        assertFalse(r.isEmpty());
900        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
901      }
902    }
903    HRegion region = (HRegion) find(tableName);
904    int readLockCount = region.getReadLockCount();
905    LOG.info("readLockCount:" + readLockCount);
906    assertEquals(0, readLockCount);
907  }
908
909  @Test
910  public void testMultiRowMutations() throws Exception, Throwable {
911    HTableDescriptor desc = new HTableDescriptor(tableName);
912    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
913    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
914    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
915    desc.addFamily(new HColumnDescriptor(FAMILY));
916    TEST_UTIL.getAdmin().createTable(desc);
917    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
918
919    // new a connection for lower retry number.
920    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
921    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
922    try (Connection con = ConnectionFactory.createConnection(copy)) {
923      byte[] row = Bytes.toBytes("ROW-0");
924      byte[] rowLocked= Bytes.toBytes("ROW-1");
925      byte[] value0 = Bytes.toBytes("VALUE-0");
926      byte[] value1 = Bytes.toBytes("VALUE-1");
927      byte[] value2 = Bytes.toBytes("VALUE-2");
928      assertNoLocks(tableName);
929      ExecutorService putService = Executors.newSingleThreadExecutor();
930      putService.execute(() -> {
931        try (Table table = con.getTable(tableName)) {
932          Put put0 = new Put(rowLocked);
933          put0.addColumn(FAMILY, QUALIFIER, value0);
934          // the put will be blocked by WaitingForMultiMutationsObserver.
935          table.put(put0);
936        } catch (IOException ex) {
937          throw new RuntimeException(ex);
938        }
939      });
940      ExecutorService cpService = Executors.newSingleThreadExecutor();
941      cpService.execute(() -> {
942        boolean threw;
943        Put put1 = new Put(row);
944        Put put2 = new Put(rowLocked);
945        put1.addColumn(FAMILY, QUALIFIER, value1);
946        put2.addColumn(FAMILY, QUALIFIER, value2);
947        try (Table table = con.getTable(tableName)) {
948          MultiRowMutationProtos.MutateRowsRequest request
949            = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
950              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
951                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
952                              .MutationType.PUT, put1))
953              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
954                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
955                              .MutationType.PUT, put2))
956              .build();
957          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
958            ROW, ROW,
959            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
960              ServerRpcController controller = new ServerRpcController();
961              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
962                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
963              exe.mutateRows(controller, request, rpcCallback);
964              return rpcCallback.get();
965            });
966          threw = false;
967        } catch (Throwable ex) {
968          threw = true;
969        }
970        if (!threw) {
971          // Can't call fail() earlier because the catch would eat it.
972          fail("This cp should fail because the target lock is blocked by previous put");
973        }
974      });
975      cpService.shutdown();
976      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
977      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
978      observer.latch.countDown();
979      putService.shutdown();
980      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
981      try (Table table = con.getTable(tableName)) {
982        Get g0 = new Get(row);
983        Get g1 = new Get(rowLocked);
984        Result r0 = table.get(g0);
985        Result r1 = table.get(g1);
986        assertTrue(r0.isEmpty());
987        assertFalse(r1.isEmpty());
988        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
989      }
990      assertNoLocks(tableName);
991    }
992  }
993
994  /**
995   * A test case for issue HBASE-17482
996   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
997   * onto cells in the append thread, a countdown latch is used to ensure that happened
998   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
999   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
1000   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
1001   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
1002   * with a smaller readpoint can see these data, which disobey the multi version
1003   * concurrency control rules.
1004   * This test case is to reproduce this scenario.
1005   * @throws IOException
1006   */
1007  @Test
1008  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
1009    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
1010      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1011      //put two row first to init the scanner
1012      Put put = new Put(Bytes.toBytes("0"));
1013      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
1014      table.put(put);
1015      put = new Put(Bytes.toBytes("00"));
1016      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
1017      table.put(put);
1018      Scan scan = new Scan();
1019      scan.setTimeRange(0, Long.MAX_VALUE);
1020      scan.setCaching(1);
1021      ResultScanner scanner = table.getScanner(scan);
1022      int rowNum = scanner.next() != null ? 1 : 0;
1023      //the started scanner shouldn't see the rows put below
1024      for (int i = 1; i < 1000; i++) {
1025        put = new Put(Bytes.toBytes(String.valueOf(i)));
1026        put.setDurability(Durability.ASYNC_WAL);
1027        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
1028        table.put(put);
1029      }
1030      for (Result result : scanner) {
1031        rowNum++;
1032      }
1033      //scanner should only see two rows
1034      assertEquals(2, rowNum);
1035      scanner = table.getScanner(scan);
1036      rowNum = 0;
1037      for (Result result : scanner) {
1038        rowNum++;
1039      }
1040      // the new scanner should see all rows
1041      assertEquals(1001, rowNum);
1042    }
1043  }
1044
1045  @Test
1046  public void testPutThenGetWithMultipleThreads() throws Exception {
1047    final int THREAD_NUM = 20;
1048    final int ROUND_NUM = 10;
1049    for (int round = 0; round < ROUND_NUM; round++) {
1050      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1051      final AtomicInteger successCnt = new AtomicInteger(0);
1052      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1053        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1054
1055        for (int i = 0; i < THREAD_NUM; i++) {
1056          final int index = i;
1057          Thread t = new Thread(new Runnable() {
1058
1059            @Override
1060            public void run() {
1061              final byte[] row = Bytes.toBytes("row-" + index);
1062              final byte[] value = Bytes.toBytes("v" + index);
1063              try {
1064                Put put = new Put(row);
1065                put.addColumn(FAMILY, QUALIFIER, value);
1066                ht.put(put);
1067                Get get = new Get(row);
1068                Result result = ht.get(get);
1069                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1070                if (Bytes.equals(value, returnedValue)) {
1071                  successCnt.getAndIncrement();
1072                } else {
1073                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1074                          + ", returned value: "
1075                          + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1076                }
1077              } catch (Throwable e) {
1078                // do nothing
1079              }
1080            }
1081          });
1082          threads.add(t);
1083        }
1084        for (Thread t : threads) {
1085          t.start();
1086        }
1087        for (Thread t : threads) {
1088          t.join();
1089        }
1090        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1091      }
1092      TEST_UTIL.deleteTable(tableName);
1093    }
1094  }
1095
1096  private static void assertNoLocks(final TableName tableName)
1097          throws IOException, InterruptedException {
1098    HRegion region = (HRegion) find(tableName);
1099    assertEquals(0, region.getLockedRows().size());
1100  }
1101  private static HRegion find(final TableName tableName)
1102      throws IOException, InterruptedException {
1103    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1104    List<HRegion> regions = rs.getRegions(tableName);
1105    assertEquals(1, regions.size());
1106    return regions.get(0);
1107  }
1108
1109  private static <T extends RegionObserver> T find(final TableName tableName,
1110          Class<T> clz) throws IOException, InterruptedException {
1111    HRegion region = find(tableName);
1112    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1113    assertTrue("The cp instance should be " + clz.getName()
1114            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1115    return clz.cast(cp);
1116  }
1117
1118  public static class WaitingForMultiMutationsObserver
1119      implements RegionCoprocessor, RegionObserver {
1120    final CountDownLatch latch = new CountDownLatch(1);
1121
1122    @Override
1123    public Optional<RegionObserver> getRegionObserver() {
1124      return Optional.of(this);
1125    }
1126
1127    @Override
1128    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1129            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1130      try {
1131        latch.await();
1132      } catch (InterruptedException ex) {
1133        throw new IOException(ex);
1134      }
1135    }
1136  }
1137
1138  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1139    private final CountDownLatch latch = new CountDownLatch(1);
1140
1141    @Override
1142    public Optional<RegionObserver> getRegionObserver() {
1143      return Optional.of(this);
1144    }
1145
1146    @Override
1147    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1148            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1149      try {
1150        // waiting for scanner
1151        latch.await();
1152      } catch (InterruptedException ex) {
1153        throw new IOException(ex);
1154      }
1155    }
1156
1157    @Override
1158    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1159            final Scan scan, final RegionScanner s) throws IOException {
1160      latch.countDown();
1161      return s;
1162    }
1163  }
1164
1165  static byte[] generateHugeValue(int size) {
1166    Random rand = ThreadLocalRandom.current();
1167    byte[] value = new byte[size];
1168    for (int i = 0; i < value.length; i++) {
1169      value[i] = (byte) rand.nextInt(256);
1170    }
1171    return value;
1172  }
1173
1174  @Test
1175  public void testScanWithBatchSizeReturnIncompleteCells()
1176          throws IOException, InterruptedException {
1177    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1178      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1179      .build();
1180    try (Table table = TEST_UTIL.createTable(hd, null)) {
1181      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1182
1183      Put put = new Put(ROW);
1184      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1185      table.put(put);
1186
1187      put = new Put(ROW);
1188      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1189      table.put(put);
1190
1191      for (int i = 2; i < 5; i++) {
1192        for (int version = 0; version < 2; version++) {
1193          put = new Put(ROW);
1194          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1195          table.put(put);
1196        }
1197      }
1198
1199      Scan scan = new Scan();
1200      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1201              .setMaxResultSize(4 * 1024 * 1024);
1202      Result result;
1203      try (ResultScanner scanner = table.getScanner(scan)) {
1204        List<Result> list = new ArrayList<>();
1205        /*
1206         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB;
1207         * The second scan rpc should return a result with 3 cells, because reach the batch limit
1208         * = 3;
1209         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1210         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1211         * into two result: 2+3 -> 3+2;
1212         */
1213        while ((result = scanner.next()) != null) {
1214          list.add(result);
1215        }
1216
1217        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1218        Assert.assertEquals(2, list.size());
1219        Assert.assertEquals(3, list.get(0).size());
1220        Assert.assertEquals(2, list.get(1).size());
1221      }
1222
1223      scan = new Scan();
1224      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1225              .setMaxResultSize(4 * 1024 * 1024);
1226      try (ResultScanner scanner = table.getScanner(scan)) {
1227        List<Result> list = new ArrayList<>();
1228        while ((result = scanner.next()) != null) {
1229          list.add(result);
1230        }
1231        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1232        Assert.assertEquals(3, list.size());
1233        Assert.assertEquals(2, list.get(0).size());
1234        Assert.assertEquals(2, list.get(1).size());
1235        Assert.assertEquals(1, list.get(2).size());
1236      }
1237    }
1238  }
1239}