001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HColumnDescriptor;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
058import org.apache.hadoop.hbase.regionserver.HRegion;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.Pair;
067import org.junit.After;
068import org.junit.AfterClass;
069import org.junit.Assert;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Rule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.junit.rules.TestName;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
082
083@Category({ LargeTests.class, ClientTests.class })
084public class TestFromClientSide3 {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestFromClientSide3.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
091  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
092  private static final int WAITTABLE_MILLIS = 10000;
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static int SLAVES = 3;
095  private static final byte[] ROW = Bytes.toBytes("testRow");
096  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
097  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
098  private static final byte[] VALUE = Bytes.toBytes("testValue");
099  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
100  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
101  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
102
103  @Rule
104  public TestName name = new TestName();
105  private TableName tableName;
106
107  /**
108   * @throws java.lang.Exception
109   */
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    TEST_UTIL.startMiniCluster(SLAVES);
113  }
114
115  /**
116   * @throws java.lang.Exception
117   */
118  @AfterClass
119  public static void tearDownAfterClass() throws Exception {
120    TEST_UTIL.shutdownMiniCluster();
121  }
122
123  /**
124   * @throws java.lang.Exception
125   */
126  @Before
127  public void setUp() throws Exception {
128    tableName = TableName.valueOf(name.getMethodName());
129  }
130
131  /**
132   * @throws java.lang.Exception
133   */
134  @After
135  public void tearDown() throws Exception {
136    for (HTableDescriptor htd : TEST_UTIL.getAdmin().listTables()) {
137      LOG.info("Tear down, remove table=" + htd.getTableName());
138      TEST_UTIL.deleteTable(htd.getTableName());
139    }
140  }
141
142  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception {
143    Put put = new Put(row);
144    Random rand = ThreadLocalRandom.current();
145    for (int i = 0; i < nPuts; i++) {
146      byte[] qualifier = Bytes.toBytes(rand.nextInt());
147      byte[] value = Bytes.toBytes(rand.nextInt());
148      put.addColumn(family, qualifier, value);
149    }
150    table.put(put);
151  }
152
153  private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, byte[] row, byte[] family,
154    int nFlushes, int nPuts) throws Exception {
155
156    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) {
157      // connection needed for poll-wait
158      HRegionLocation loc = locator.getRegionLocation(row, true);
159      AdminProtos.AdminService.BlockingInterface server =
160        ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
161      byte[] regName = loc.getRegionInfo().getRegionName();
162
163      for (int i = 0; i < nFlushes; i++) {
164        randomCFPuts(table, row, family, nPuts);
165        List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
166        int sfCount = sf.size();
167
168        admin.flush(table.getName());
169      }
170    }
171  }
172
173  private static List<Cell> toList(ResultScanner scanner) {
174    try {
175      List<Cell> cells = new ArrayList<>();
176      for (Result r : scanner) {
177        cells.addAll(r.listCells());
178      }
179      return cells;
180    } finally {
181      scanner.close();
182    }
183  }
184
185  @Test
186  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
187    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
188      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
189      byte[] row = Bytes.toBytes("SpecifiedRow");
190      byte[] value0 = Bytes.toBytes("value_0");
191      byte[] value1 = Bytes.toBytes("value_1");
192      Put put = new Put(row);
193      put.addColumn(FAMILY, QUALIFIER, VALUE);
194      table.put(put);
195      Delete d = new Delete(row);
196      table.delete(d);
197      put = new Put(row);
198      put.addColumn(FAMILY, null, value0);
199      table.put(put);
200      put = new Put(row);
201      put.addColumn(FAMILY, null, value1);
202      table.put(put);
203      List<Cell> cells = toList(table.getScanner(new Scan()));
204      assertEquals(1, cells.size());
205      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
206
207      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
208      assertEquals(1, cells.size());
209      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
210
211      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
212      assertEquals(0, cells.size());
213
214      TEST_UTIL.getAdmin().flush(tableName);
215      cells = toList(table.getScanner(new Scan()));
216      assertEquals(1, cells.size());
217      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
218
219      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
220      assertEquals(1, cells.size());
221      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
222
223      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
224      assertEquals(0, cells.size());
225    }
226  }
227
228  @Test
229  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
230    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
231      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
232      byte[] row = Bytes.toBytes("SpecifiedRow");
233      byte[] qual0 = Bytes.toBytes("qual0");
234      byte[] qual1 = Bytes.toBytes("qual1");
235      long now = EnvironmentEdgeManager.currentTime();
236      Delete d = new Delete(row, now);
237      table.delete(d);
238
239      Put put = new Put(row);
240      put.addColumn(FAMILY, null, now + 1, VALUE);
241      table.put(put);
242
243      put = new Put(row);
244      put.addColumn(FAMILY, qual1, now + 2, qual1);
245      table.put(put);
246
247      put = new Put(row);
248      put.addColumn(FAMILY, qual0, now + 3, qual0);
249      table.put(put);
250
251      Result r = table.get(new Get(row));
252      assertEquals(r.toString(), 3, r.size());
253      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
254      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
255      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
256
257      TEST_UTIL.getAdmin().flush(tableName);
258      r = table.get(new Get(row));
259      assertEquals(3, r.size());
260      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
261      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
262      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
263    }
264  }
265
266  // override the config settings at the CF level and ensure priority
267  @Test
268  public void testAdvancedConfigOverride() throws Exception {
269    /*
270     * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3,
271     * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If
272     * we use the HTD value instead of the default config value, adding 3 files and issuing a
273     * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify
274     * table. The CF schema should override the Table schema and now cause a minor compaction.
275     */
276    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
277
278    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
279      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
280      try (Admin admin = TEST_UTIL.getAdmin()) {
281        ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
282
283        // Create 3 store files.
284        byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt());
285        performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100);
286
287        try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
288          // Verify we have multiple store files.
289          HRegionLocation loc = locator.getRegionLocation(row, true);
290          byte[] regionName = loc.getRegionInfo().getRegionName();
291          AdminProtos.AdminService.BlockingInterface server =
292            connection.getAdmin(loc.getServerName());
293          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
294
295          // Issue a compaction request
296          admin.compact(tableName);
297
298          // poll wait for the compactions to happen
299          for (int i = 0; i < 10 * 1000 / 40; ++i) {
300            // The number of store files after compaction should be lesser.
301            loc = locator.getRegionLocation(row, true);
302            if (!loc.getRegionInfo().isOffline()) {
303              regionName = loc.getRegionInfo().getRegionName();
304              server = connection.getAdmin(loc.getServerName());
305              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) {
306                break;
307              }
308            }
309            Thread.sleep(40);
310          }
311          // verify the compactions took place and that we didn't just time out
312          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1);
313
314          // change the compaction.min config option for this table to 5
315          LOG.info("hbase.hstore.compaction.min should now be 5");
316          HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor());
317          htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
318          admin.modifyTable(tableName, htd);
319          Pair<Integer, Integer> st = admin.getAlterStatus(tableName);
320          while (null != st && st.getFirst() > 0) {
321            LOG.debug(st.getFirst() + " regions left to update");
322            Thread.sleep(40);
323            st = admin.getAlterStatus(tableName);
324          }
325          LOG.info("alter status finished");
326
327          // Create 3 more store files.
328          performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10);
329
330          // Issue a compaction request
331          admin.compact(tableName);
332
333          // This time, the compaction request should not happen
334          Thread.sleep(10 * 1000);
335          loc = locator.getRegionLocation(row, true);
336          regionName = loc.getRegionInfo().getRegionName();
337          server = connection.getAdmin(loc.getServerName());
338          int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size();
339          assertTrue(sfCount > 1);
340
341          // change an individual CF's config option to 2 & online schema update
342          LOG.info("hbase.hstore.compaction.min should now be 2");
343          HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
344          hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
345          htd.modifyFamily(hcd);
346          admin.modifyTable(tableName, htd);
347          st = admin.getAlterStatus(tableName);
348          while (null != st && st.getFirst() > 0) {
349            LOG.debug(st.getFirst() + " regions left to update");
350            Thread.sleep(40);
351            st = admin.getAlterStatus(tableName);
352          }
353          LOG.info("alter status finished");
354
355          // Issue a compaction request
356          admin.compact(tableName);
357
358          // poll wait for the compactions to happen
359          for (int i = 0; i < 10 * 1000 / 40; ++i) {
360            loc = locator.getRegionLocation(row, true);
361            regionName = loc.getRegionInfo().getRegionName();
362            try {
363              server = connection.getAdmin(loc.getServerName());
364              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) {
365                break;
366              }
367            } catch (Exception e) {
368              LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName));
369            }
370            Thread.sleep(40);
371          }
372
373          // verify the compaction took place and that we didn't just time out
374          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount);
375
376          // Finally, ensure that we can remove a custom config value after we made it
377          LOG.info("Removing CF config value");
378          LOG.info("hbase.hstore.compaction.min should now be 5");
379          hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
380          hcd.setValue("hbase.hstore.compaction.min", null);
381          htd.modifyFamily(hcd);
382          admin.modifyTable(tableName, htd);
383          st = admin.getAlterStatus(tableName);
384          while (null != st && st.getFirst() > 0) {
385            LOG.debug(st.getFirst() + " regions left to update");
386            Thread.sleep(40);
387            st = admin.getAlterStatus(tableName);
388          }
389          LOG.info("alter status finished");
390          assertNull(
391            table.getTableDescriptor().getFamily(FAMILY).getValue("hbase.hstore.compaction.min"));
392        }
393      }
394    }
395  }
396
397  @Test
398  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
399    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
400      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
401      List actions = (List) new ArrayList();
402      Object[] results = new Object[2];
403      // create an empty Put
404      Put put1 = new Put(ROW);
405      actions.add(put1);
406
407      Put put2 = new Put(ANOTHERROW);
408      put2.addColumn(FAMILY, QUALIFIER, VALUE);
409      actions.add(put2);
410
411      table.batch(actions, results);
412      fail("Empty Put should have failed the batch call");
413    } catch (IllegalArgumentException iae) {
414    }
415  }
416
417  // Test Table.batch with large amount of mutations against the same key.
418  // It used to trigger read lock's "Maximum lock count exceeded" Error.
419  @Test
420  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
421    int sixtyFourK = 64 * 1024;
422    List actions = new ArrayList();
423    Object[] results = new Object[(sixtyFourK + 1) * 2];
424
425    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
426      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
427
428      for (int i = 0; i < sixtyFourK + 1; i++) {
429        Put put1 = new Put(ROW);
430        put1.addColumn(FAMILY, QUALIFIER, VALUE);
431        actions.add(put1);
432
433        Put put2 = new Put(ANOTHERROW);
434        put2.addColumn(FAMILY, QUALIFIER, VALUE);
435        actions.add(put2);
436      }
437
438      table.batch(actions, results);
439    }
440  }
441
442  @Test
443  public void testBatchWithRowMutation() throws Exception {
444    LOG.info("Starting testBatchWithRowMutation");
445    byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
446
447    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
448      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
449
450      RowMutations arm = RowMutations
451        .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
452      Object[] batchResult = new Object[1];
453      table.batch(Arrays.asList(arm), batchResult);
454
455      Get g = new Get(ROW);
456      Result r = table.get(g);
457      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
458
459      arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
460        new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
461      table.batch(Arrays.asList(arm), batchResult);
462      r = table.get(g);
463      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
464      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
465
466      // Test that we get the correct remote exception for RowMutations from batch()
467      try {
468        arm = RowMutations.of(Collections.singletonList(
469          new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE)));
470        table.batch(Arrays.asList(arm), batchResult);
471        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
472      } catch (RetriesExhaustedWithDetailsException e) {
473        String msg = e.getMessage();
474        assertTrue(msg.contains("NoSuchColumnFamilyException"));
475      }
476    }
477  }
478
479  @Test
480  public void testBatchWithCheckAndMutate() throws Exception {
481    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
482      byte[] row1 = Bytes.toBytes("row1");
483      byte[] row2 = Bytes.toBytes("row2");
484      byte[] row3 = Bytes.toBytes("row3");
485      byte[] row4 = Bytes.toBytes("row4");
486      byte[] row5 = Bytes.toBytes("row5");
487      byte[] row6 = Bytes.toBytes("row6");
488      byte[] row7 = Bytes.toBytes("row7");
489
490      table
491        .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
492          new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
493          new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
494          new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
495          new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
496          new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
497          new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
498
499      CheckAndMutate checkAndMutate1 =
500        CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
501          .build(new RowMutations(row1)
502            .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
503            .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
504            .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
505            .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
506      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
507      RowMutations mutations = new RowMutations(row3)
508        .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
509        .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
510        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
511        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
512      CheckAndMutate checkAndMutate2 =
513        CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
514          .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
515      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
516      CheckAndMutate checkAndMutate3 =
517        CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
518          .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
519      CheckAndMutate checkAndMutate4 =
520        CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
521          .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
522
523      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
524        checkAndMutate3, checkAndMutate4);
525      Object[] results = new Object[actions.size()];
526      table.batch(actions, results);
527
528      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
529      assertTrue(checkAndMutateResult.isSuccess());
530      assertEquals(3L,
531        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
532      assertEquals("d",
533        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
534
535      assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
536
537      Result result = (Result) results[2];
538      assertTrue(result.getExists());
539      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
540      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
541
542      checkAndMutateResult = (CheckAndMutateResult) results[3];
543      assertFalse(checkAndMutateResult.isSuccess());
544      assertNull(checkAndMutateResult.getResult());
545
546      assertTrue(((Result) results[4]).isEmpty());
547
548      checkAndMutateResult = (CheckAndMutateResult) results[5];
549      assertTrue(checkAndMutateResult.isSuccess());
550      assertEquals(11,
551        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F"))));
552
553      checkAndMutateResult = (CheckAndMutateResult) results[6];
554      assertTrue(checkAndMutateResult.isSuccess());
555      assertEquals("gg",
556        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G"))));
557
558      result = table.get(new Get(row1));
559      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
560      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
561      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
562      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
563
564      result = table.get(new Get(row3));
565      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
566      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
567      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
568      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
569      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
570
571      result = table.get(new Get(row4));
572      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
573
574      result = table.get(new Get(row5));
575      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
576
577      result = table.get(new Get(row6));
578      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
579
580      result = table.get(new Get(row7));
581      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
582    }
583  }
584
585  @Test
586  public void testHTableExistsMethodSingleRegionSingleGet()
587    throws IOException, InterruptedException {
588    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
589      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
590
591      // Test with a single region table.
592      Put put = new Put(ROW);
593      put.addColumn(FAMILY, QUALIFIER, VALUE);
594
595      Get get = new Get(ROW);
596
597      boolean exist = table.exists(get);
598      assertFalse(exist);
599
600      table.put(put);
601
602      exist = table.exists(get);
603      assertTrue(exist);
604    }
605  }
606
607  @Test
608  public void testHTableExistsMethodSingleRegionMultipleGets()
609    throws IOException, InterruptedException {
610    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
611      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
612
613      Put put = new Put(ROW);
614      put.addColumn(FAMILY, QUALIFIER, VALUE);
615      table.put(put);
616
617      List<Get> gets = new ArrayList<>();
618      gets.add(new Get(ROW));
619      gets.add(new Get(ANOTHERROW));
620
621      boolean[] results = table.exists(gets);
622      assertTrue(results[0]);
623      assertFalse(results[1]);
624    }
625  }
626
627  @Test
628  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
629    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
630      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
631
632      Put put = new Put(ROW);
633      put.addColumn(FAMILY, QUALIFIER, VALUE);
634      table.put(put);
635
636      Get get = new Get(ROW);
637
638      boolean exist = table.exists(get);
639      assertEquals(true, exist);
640
641      Result result = table.get(get);
642      assertEquals(false, result.isEmpty());
643      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
644    }
645  }
646
647  @Test
648  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
649    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
650      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
651
652      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
653      Put put = new Put(ROW);
654      put.addColumn(FAMILY, QUALIFIER, VALUE);
655      table.put(put);
656      put = new Put(ROW2);
657      put.addColumn(FAMILY, QUALIFIER, VALUE);
658      table.put(put);
659
660      Get get = new Get(ROW);
661      Get get2 = new Get(ROW2);
662      ArrayList<Get> getList = new ArrayList(2);
663      getList.add(get);
664      getList.add(get2);
665
666      boolean[] exists = table.existsAll(getList);
667      assertEquals(true, exists[0]);
668      assertEquals(true, exists[1]);
669
670      Result[] result = table.get(getList);
671      assertEquals(false, result[0].isEmpty());
672      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
673      assertEquals(false, result[1].isEmpty());
674      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
675    }
676  }
677
678  @Test
679  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
680    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
681      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
682      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
683
684      Put put = new Put(ROW);
685      put.addColumn(FAMILY, QUALIFIER, VALUE);
686
687      Get get = new Get(ROW);
688
689      boolean exist = table.exists(get);
690      assertFalse(exist);
691
692      table.put(put);
693
694      exist = table.exists(get);
695      assertTrue(exist);
696    }
697  }
698
699  @Test
700  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
701    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
702      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
703      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
704
705      Put put = new Put(ROW);
706      put.addColumn(FAMILY, QUALIFIER, VALUE);
707      table.put(put);
708
709      List<Get> gets = new ArrayList<>();
710      gets.add(new Get(ANOTHERROW));
711      gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 })));
712      gets.add(new Get(ROW));
713      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
714
715      LOG.info("Calling exists");
716      boolean[] results = table.existsAll(gets);
717      assertFalse(results[0]);
718      assertFalse(results[1]);
719      assertTrue(results[2]);
720      assertFalse(results[3]);
721
722      // Test with the first region.
723      put = new Put(new byte[] { 0x00 });
724      put.addColumn(FAMILY, QUALIFIER, VALUE);
725      table.put(put);
726
727      gets = new ArrayList<>();
728      gets.add(new Get(new byte[] { 0x00 }));
729      gets.add(new Get(new byte[] { 0x00, 0x00 }));
730      results = table.existsAll(gets);
731      assertTrue(results[0]);
732      assertFalse(results[1]);
733
734      // Test with the last region
735      put = new Put(new byte[] { (byte) 0xff, (byte) 0xff });
736      put.addColumn(FAMILY, QUALIFIER, VALUE);
737      table.put(put);
738
739      gets = new ArrayList<>();
740      gets.add(new Get(new byte[] { (byte) 0xff }));
741      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff }));
742      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff }));
743      results = table.existsAll(gets);
744      assertFalse(results[0]);
745      assertTrue(results[1]);
746      assertFalse(results[2]);
747    }
748  }
749
750  @Test
751  public void testGetEmptyRow() throws Exception {
752    // Create a table and put in 1 row
753    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
754      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
755
756      Put put = new Put(ROW_BYTES);
757      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
758      table.put(put);
759
760      // Try getting the row with an empty row key
761      Result res = null;
762      try {
763        res = table.get(new Get(new byte[0]));
764        fail();
765      } catch (IllegalArgumentException e) {
766        // Expected.
767      }
768      assertTrue(res == null);
769      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
770      assertTrue(res.isEmpty() == true);
771      res = table.get(new Get(ROW_BYTES));
772      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
773    }
774  }
775
776  @Test
777  public void testConnectionDefaultUsesCodec() throws Exception {
778    ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
779    assertTrue(con.hasCellBlockSupport());
780  }
781
782  @Test
783  public void testPutWithPreBatchMutate() throws Exception {
784    testPreBatchMutate(tableName, () -> {
785      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
786        Put put = new Put(ROW);
787        put.addColumn(FAMILY, QUALIFIER, VALUE);
788        t.put(put);
789      } catch (IOException ex) {
790        throw new RuntimeException(ex);
791      }
792    });
793  }
794
795  @Test
796  public void testRowMutationsWithPreBatchMutate() throws Exception {
797    testPreBatchMutate(tableName, () -> {
798      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
799        RowMutations rm = new RowMutations(ROW, 1);
800        Put put = new Put(ROW);
801        put.addColumn(FAMILY, QUALIFIER, VALUE);
802        rm.add(put);
803        t.mutateRow(rm);
804      } catch (IOException ex) {
805        throw new RuntimeException(ex);
806      }
807    });
808  }
809
810  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
811    HTableDescriptor desc = new HTableDescriptor(tableName);
812    desc.addCoprocessor(WaitingForScanObserver.class.getName());
813    desc.addFamily(new HColumnDescriptor(FAMILY));
814    TEST_UTIL.getAdmin().createTable(desc);
815    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
816
817    ExecutorService service = Executors.newFixedThreadPool(2);
818    service.execute(rn);
819    final List<Cell> cells = new ArrayList<>();
820    service.execute(() -> {
821      try {
822        // waiting for update.
823        TimeUnit.SECONDS.sleep(3);
824        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
825          Scan scan = new Scan();
826          try (ResultScanner scanner = t.getScanner(scan)) {
827            for (Result r : scanner) {
828              cells.addAll(Arrays.asList(r.rawCells()));
829            }
830          }
831        }
832      } catch (IOException | InterruptedException ex) {
833        throw new RuntimeException(ex);
834      }
835    });
836    service.shutdown();
837    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
838    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
839      + ", so the data is invisible to reader", 0, cells.size());
840    TEST_UTIL.deleteTable(tableName);
841  }
842
843  @Test
844  public void testLockLeakWithDelta() throws Exception, Throwable {
845    HTableDescriptor desc = new HTableDescriptor(tableName);
846    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
847    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
848    desc.addFamily(new HColumnDescriptor(FAMILY));
849    TEST_UTIL.getAdmin().createTable(desc);
850    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
851
852    // new a connection for lower retry number.
853    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
854    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
855    try (Connection con = ConnectionFactory.createConnection(copy)) {
856      HRegion region = (HRegion) find(tableName);
857      region.setTimeoutForWriteLock(10);
858      ExecutorService putService = Executors.newSingleThreadExecutor();
859      putService.execute(() -> {
860        try (Table table = con.getTable(tableName)) {
861          Put put = new Put(ROW);
862          put.addColumn(FAMILY, QUALIFIER, VALUE);
863          // the put will be blocked by WaitingForMultiMutationsObserver.
864          table.put(put);
865        } catch (IOException ex) {
866          throw new RuntimeException(ex);
867        }
868      });
869      ExecutorService appendService = Executors.newSingleThreadExecutor();
870      appendService.execute(() -> {
871        Append append = new Append(ROW);
872        append.addColumn(FAMILY, QUALIFIER, VALUE);
873        try (Table table = con.getTable(tableName)) {
874          table.append(append);
875          fail("The APPEND should fail because the target lock is blocked by previous put");
876        } catch (Exception ex) {
877        }
878      });
879      appendService.shutdown();
880      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
881      WaitingForMultiMutationsObserver observer =
882        find(tableName, WaitingForMultiMutationsObserver.class);
883      observer.latch.countDown();
884      putService.shutdown();
885      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
886      try (Table table = con.getTable(tableName)) {
887        Result r = table.get(new Get(ROW));
888        assertFalse(r.isEmpty());
889        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
890      }
891    }
892    HRegion region = (HRegion) find(tableName);
893    int readLockCount = region.getReadLockCount();
894    LOG.info("readLockCount:" + readLockCount);
895    assertEquals(0, readLockCount);
896  }
897
898  @Test
899  public void testMultiRowMutations() throws Exception, Throwable {
900    HTableDescriptor desc = new HTableDescriptor(tableName);
901    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
902    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
903    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
904    desc.addFamily(new HColumnDescriptor(FAMILY));
905    TEST_UTIL.getAdmin().createTable(desc);
906    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
907
908    // new a connection for lower retry number.
909    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
910    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
911    try (Connection con = ConnectionFactory.createConnection(copy)) {
912      byte[] row = Bytes.toBytes("ROW-0");
913      byte[] rowLocked = Bytes.toBytes("ROW-1");
914      byte[] value0 = Bytes.toBytes("VALUE-0");
915      byte[] value1 = Bytes.toBytes("VALUE-1");
916      byte[] value2 = Bytes.toBytes("VALUE-2");
917      assertNoLocks(tableName);
918      ExecutorService putService = Executors.newSingleThreadExecutor();
919      putService.execute(() -> {
920        try (Table table = con.getTable(tableName)) {
921          Put put0 = new Put(rowLocked);
922          put0.addColumn(FAMILY, QUALIFIER, value0);
923          // the put will be blocked by WaitingForMultiMutationsObserver.
924          table.put(put0);
925        } catch (IOException ex) {
926          throw new RuntimeException(ex);
927        }
928      });
929      ExecutorService cpService = Executors.newSingleThreadExecutor();
930      cpService.execute(() -> {
931        boolean threw;
932        Put put1 = new Put(row);
933        Put put2 = new Put(rowLocked);
934        put1.addColumn(FAMILY, QUALIFIER, value1);
935        put2.addColumn(FAMILY, QUALIFIER, value2);
936        try (Table table = con.getTable(tableName)) {
937          MultiRowMutationProtos.MutateRowsRequest request =
938            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
939              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
940                org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
941                put1))
942              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
943                org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
944                put2))
945              .build();
946          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
947            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
948              ServerRpcController controller = new ServerRpcController();
949              CoprocessorRpcUtils.BlockingRpcCallback<
950                MultiRowMutationProtos.MutateRowsResponse> rpcCallback =
951                  new CoprocessorRpcUtils.BlockingRpcCallback<>();
952              exe.mutateRows(controller, request, rpcCallback);
953              return rpcCallback.get();
954            });
955          threw = false;
956        } catch (Throwable ex) {
957          threw = true;
958        }
959        if (!threw) {
960          // Can't call fail() earlier because the catch would eat it.
961          fail("This cp should fail because the target lock is blocked by previous put");
962        }
963      });
964      cpService.shutdown();
965      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
966      WaitingForMultiMutationsObserver observer =
967        find(tableName, WaitingForMultiMutationsObserver.class);
968      observer.latch.countDown();
969      putService.shutdown();
970      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
971      try (Table table = con.getTable(tableName)) {
972        Get g0 = new Get(row);
973        Get g1 = new Get(rowLocked);
974        Result r0 = table.get(g0);
975        Result r1 = table.get(g1);
976        assertTrue(r0.isEmpty());
977        assertFalse(r1.isEmpty());
978        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
979      }
980      assertNoLocks(tableName);
981    }
982  }
983
984  /**
985   * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is
986   * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that
987   * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make
988   * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch
989   * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0)
990   * are put into memstore, then a scanner with a smaller readpoint can see these data, which
991   * disobey the multi version concurrency control rules. This test case is to reproduce this
992   * scenario. n
993   */
994  @Test
995  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
996    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
997      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
998      // put two row first to init the scanner
999      Put put = new Put(Bytes.toBytes("0"));
1000      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
1001      table.put(put);
1002      put = new Put(Bytes.toBytes("00"));
1003      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
1004      table.put(put);
1005      Scan scan = new Scan();
1006      scan.setTimeRange(0, Long.MAX_VALUE);
1007      scan.setCaching(1);
1008      ResultScanner scanner = table.getScanner(scan);
1009      int rowNum = scanner.next() != null ? 1 : 0;
1010      // the started scanner shouldn't see the rows put below
1011      for (int i = 1; i < 1000; i++) {
1012        put = new Put(Bytes.toBytes(String.valueOf(i)));
1013        put.setDurability(Durability.ASYNC_WAL);
1014        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
1015        table.put(put);
1016      }
1017      for (Result result : scanner) {
1018        rowNum++;
1019      }
1020      // scanner should only see two rows
1021      assertEquals(2, rowNum);
1022      scanner = table.getScanner(scan);
1023      rowNum = 0;
1024      for (Result result : scanner) {
1025        rowNum++;
1026      }
1027      // the new scanner should see all rows
1028      assertEquals(1001, rowNum);
1029    }
1030  }
1031
1032  @Test
1033  public void testPutThenGetWithMultipleThreads() throws Exception {
1034    final int THREAD_NUM = 20;
1035    final int ROUND_NUM = 10;
1036    for (int round = 0; round < ROUND_NUM; round++) {
1037      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1038      final AtomicInteger successCnt = new AtomicInteger(0);
1039      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1040        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1041
1042        for (int i = 0; i < THREAD_NUM; i++) {
1043          final int index = i;
1044          Thread t = new Thread(new Runnable() {
1045
1046            @Override
1047            public void run() {
1048              final byte[] row = Bytes.toBytes("row-" + index);
1049              final byte[] value = Bytes.toBytes("v" + index);
1050              try {
1051                Put put = new Put(row);
1052                put.addColumn(FAMILY, QUALIFIER, value);
1053                ht.put(put);
1054                Get get = new Get(row);
1055                Result result = ht.get(get);
1056                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1057                if (Bytes.equals(value, returnedValue)) {
1058                  successCnt.getAndIncrement();
1059                } else {
1060                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1061                    + ", returned value: "
1062                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1063                }
1064              } catch (Throwable e) {
1065                // do nothing
1066              }
1067            }
1068          });
1069          threads.add(t);
1070        }
1071        for (Thread t : threads) {
1072          t.start();
1073        }
1074        for (Thread t : threads) {
1075          t.join();
1076        }
1077        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1078      }
1079      TEST_UTIL.deleteTable(tableName);
1080    }
1081  }
1082
1083  private static void assertNoLocks(final TableName tableName)
1084    throws IOException, InterruptedException {
1085    HRegion region = (HRegion) find(tableName);
1086    assertEquals(0, region.getLockedRows().size());
1087  }
1088
1089  private static HRegion find(final TableName tableName) throws IOException, InterruptedException {
1090    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1091    List<HRegion> regions = rs.getRegions(tableName);
1092    assertEquals(1, regions.size());
1093    return regions.get(0);
1094  }
1095
1096  private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz)
1097    throws IOException, InterruptedException {
1098    HRegion region = find(tableName);
1099    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1100    assertTrue("The cp instance should be " + clz.getName() + ", current instance is "
1101      + cp.getClass().getName(), clz.isInstance(cp));
1102    return clz.cast(cp);
1103  }
1104
1105  public static class WaitingForMultiMutationsObserver
1106    implements RegionCoprocessor, RegionObserver {
1107    final CountDownLatch latch = new CountDownLatch(1);
1108
1109    @Override
1110    public Optional<RegionObserver> getRegionObserver() {
1111      return Optional.of(this);
1112    }
1113
1114    @Override
1115    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1116      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1117      try {
1118        latch.await();
1119      } catch (InterruptedException ex) {
1120        throw new IOException(ex);
1121      }
1122    }
1123  }
1124
1125  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1126    private final CountDownLatch latch = new CountDownLatch(1);
1127
1128    @Override
1129    public Optional<RegionObserver> getRegionObserver() {
1130      return Optional.of(this);
1131    }
1132
1133    @Override
1134    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1135      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1136      try {
1137        // waiting for scanner
1138        latch.await();
1139      } catch (InterruptedException ex) {
1140        throw new IOException(ex);
1141      }
1142    }
1143
1144    @Override
1145    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1146      final Scan scan, final RegionScanner s) throws IOException {
1147      latch.countDown();
1148      return s;
1149    }
1150  }
1151
1152  static byte[] generateHugeValue(int size) {
1153    Random rand = ThreadLocalRandom.current();
1154    byte[] value = new byte[size];
1155    for (int i = 0; i < value.length; i++) {
1156      value[i] = (byte) rand.nextInt(256);
1157    }
1158    return value;
1159  }
1160
1161  @Test
1162  public void testScanWithBatchSizeReturnIncompleteCells()
1163    throws IOException, InterruptedException {
1164    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1165      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1166      .build();
1167    try (Table table = TEST_UTIL.createTable(hd, null)) {
1168      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1169
1170      Put put = new Put(ROW);
1171      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1172      table.put(put);
1173
1174      put = new Put(ROW);
1175      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1176      table.put(put);
1177
1178      for (int i = 2; i < 5; i++) {
1179        for (int version = 0; version < 2; version++) {
1180          put = new Put(ROW);
1181          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1182          table.put(put);
1183        }
1184      }
1185
1186      Scan scan = new Scan();
1187      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1188        .setMaxResultSize(4 * 1024 * 1024);
1189      Result result;
1190      try (ResultScanner scanner = table.getScanner(scan)) {
1191        List<Result> list = new ArrayList<>();
1192        /*
1193         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The
1194         * second scan rpc should return a result with 3 cells, because reach the batch limit = 3;
1195         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1196         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1197         * into two result: 2+3 -> 3+2;
1198         */
1199        while ((result = scanner.next()) != null) {
1200          list.add(result);
1201        }
1202
1203        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1204        Assert.assertEquals(2, list.size());
1205        Assert.assertEquals(3, list.get(0).size());
1206        Assert.assertEquals(2, list.get(1).size());
1207      }
1208
1209      scan = new Scan();
1210      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1211        .setMaxResultSize(4 * 1024 * 1024);
1212      try (ResultScanner scanner = table.getScanner(scan)) {
1213        List<Result> list = new ArrayList<>();
1214        while ((result = scanner.next()) != null) {
1215          list.add(result);
1216        }
1217        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1218        Assert.assertEquals(3, list.size());
1219        Assert.assertEquals(2, list.get(0).size());
1220        Assert.assertEquals(2, list.get(1).size());
1221        Assert.assertEquals(1, list.get(2).size());
1222      }
1223    }
1224  }
1225}