001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.Coprocessor;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.RegionMetrics;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
052import org.apache.hadoop.hbase.coprocessor.ObserverContext;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
055import org.apache.hadoop.hbase.coprocessor.RegionObserver;
056import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.ipc.RpcClientFactory;
060import org.apache.hadoop.hbase.ipc.ServerRpcController;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.HRegionServer;
063import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
064import org.apache.hadoop.hbase.regionserver.RegionScanner;
065import org.apache.hadoop.hbase.testclassification.ClientTests;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.junit.After;
070import org.junit.AfterClass;
071import org.junit.Assert;
072import org.junit.Before;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
085
086@Category({ LargeTests.class, ClientTests.class })
087public class TestFromClientSide3 {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestFromClientSide3.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
094  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
095  private static final int WAITTABLE_MILLIS = 10000;
096  private static byte[] FAMILY = Bytes.toBytes("testFamily");
097  private static int SLAVES = 3;
098  private static final byte[] ROW = Bytes.toBytes("testRow");
099  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
100  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
101  private static final byte[] VALUE = Bytes.toBytes("testValue");
102  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
103  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
104  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
105
106  @Rule
107  public TestName name = new TestName();
108  private TableName tableName;
109
110  /**
111   * @throws java.lang.Exception
112   */
113  @BeforeClass
114  public static void setUpBeforeClass() throws Exception {
115    TEST_UTIL.startMiniCluster(SLAVES);
116  }
117
118  /**
119   * @throws java.lang.Exception
120   */
121  @AfterClass
122  public static void tearDownAfterClass() throws Exception {
123    TEST_UTIL.shutdownMiniCluster();
124  }
125
126  @Before
127  public void setUp() throws Exception {
128    tableName = TableName.valueOf(name.getMethodName());
129  }
130
131  @After
132  public void tearDown() throws Exception {
133    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
134      LOG.info("Tear down, remove table=" + htd.getTableName());
135      TEST_UTIL.deleteTable(htd.getTableName());
136    }
137  }
138
139  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception {
140    Put put = new Put(row);
141    Random rand = ThreadLocalRandom.current();
142    for (int i = 0; i < nPuts; i++) {
143      byte[] qualifier = Bytes.toBytes(rand.nextInt());
144      byte[] value = Bytes.toBytes(rand.nextInt());
145      put.addColumn(family, qualifier, value);
146    }
147    table.put(put);
148  }
149
150  private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family,
151    int nFlushes, int nPuts) throws Exception {
152    for (int i = 0; i < nFlushes; i++) {
153      randomCFPuts(table, row, family, nPuts);
154      admin.flush(table.getName());
155    }
156  }
157
158  private static List<Cell> toList(ResultScanner scanner) {
159    try {
160      List<Cell> cells = new ArrayList<>();
161      for (Result r : scanner) {
162        cells.addAll(r.listCells());
163      }
164      return cells;
165    } finally {
166      scanner.close();
167    }
168  }
169
170  @Test
171  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
172    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
173      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
174      byte[] row = Bytes.toBytes("SpecifiedRow");
175      byte[] value0 = Bytes.toBytes("value_0");
176      byte[] value1 = Bytes.toBytes("value_1");
177      Put put = new Put(row);
178      put.addColumn(FAMILY, QUALIFIER, VALUE);
179      table.put(put);
180      Delete d = new Delete(row);
181      table.delete(d);
182      put = new Put(row);
183      put.addColumn(FAMILY, null, value0);
184      table.put(put);
185      put = new Put(row);
186      put.addColumn(FAMILY, null, value1);
187      table.put(put);
188      List<Cell> cells = toList(table.getScanner(new Scan()));
189      assertEquals(1, cells.size());
190      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
191
192      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
193      assertEquals(1, cells.size());
194      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
195
196      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
197      assertEquals(0, cells.size());
198
199      TEST_UTIL.getAdmin().flush(tableName);
200      cells = toList(table.getScanner(new Scan()));
201      assertEquals(1, cells.size());
202      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
203
204      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
205      assertEquals(1, cells.size());
206      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
207
208      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
209      assertEquals(0, cells.size());
210    }
211  }
212
213  @Test
214  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
215    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
216      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
217      byte[] row = Bytes.toBytes("SpecifiedRow");
218      byte[] qual0 = Bytes.toBytes("qual0");
219      byte[] qual1 = Bytes.toBytes("qual1");
220      long now = EnvironmentEdgeManager.currentTime();
221      Delete d = new Delete(row, now);
222      table.delete(d);
223
224      Put put = new Put(row);
225      put.addColumn(FAMILY, null, now + 1, VALUE);
226      table.put(put);
227
228      put = new Put(row);
229      put.addColumn(FAMILY, qual1, now + 2, qual1);
230      table.put(put);
231
232      put = new Put(row);
233      put.addColumn(FAMILY, qual0, now + 3, qual0);
234      table.put(put);
235
236      Result r = table.get(new Get(row));
237      assertEquals(r.toString(), 3, r.size());
238      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
239      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
240      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
241
242      TEST_UTIL.getAdmin().flush(tableName);
243      r = table.get(new Get(row));
244      assertEquals(3, r.size());
245      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
246      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
247      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
248    }
249  }
250
251  private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region)
252    throws IOException {
253    for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) {
254      if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) {
255        return metrics.getStoreFileCount();
256      }
257    }
258    return 0;
259  }
260
261  // override the config settings at the CF level and ensure priority
262  @Test
263  public void testAdvancedConfigOverride() throws Exception {
264    /*
265     * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3,
266     * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If
267     * we use the HTD value instead of the default config value, adding 3 files and issuing a
268     * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify
269     * table. The CF schema should override the Table schema and now cause a minor compaction.
270     */
271    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
272
273    final TableName tableName = TableName.valueOf(name.getMethodName());
274    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
275      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
276      Admin admin = TEST_UTIL.getAdmin();
277
278      // Create 3 store files.
279      byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt());
280      performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
281
282      try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
283        // Verify we have multiple store files.
284        HRegionLocation loc = locator.getRegionLocation(row, true);
285        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);
286
287        // Issue a compaction request
288        admin.compact(tableName);
289
290        // poll wait for the compactions to happen
291        for (int i = 0; i < 10 * 1000 / 40; ++i) {
292          // The number of store files after compaction should be lesser.
293          loc = locator.getRegionLocation(row, true);
294          if (!loc.getRegion().isOffline()) {
295            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) {
296              break;
297            }
298          }
299          Thread.sleep(40);
300        }
301        // verify the compactions took place and that we didn't just time out
302        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1);
303
304        // change the compaction.min config option for this table to 5
305        LOG.info("hbase.hstore.compaction.min should now be 5");
306        TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor())
307          .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build();
308        admin.modifyTable(htd);
309        LOG.info("alter status finished");
310
311        // Create 3 more store files.
312        performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10);
313
314        // Issue a compaction request
315        admin.compact(tableName);
316
317        // This time, the compaction request should not happen
318        Thread.sleep(10 * 1000);
319        loc = locator.getRegionLocation(row, true);
320        int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion());
321        assertTrue(sfCount > 1);
322
323        // change an individual CF's config option to 2 & online schema update
324        LOG.info("hbase.hstore.compaction.min should now be 2");
325        htd = TableDescriptorBuilder.newBuilder(htd)
326          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
327            .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build())
328          .build();
329        admin.modifyTable(htd);
330        LOG.info("alter status finished");
331
332        // Issue a compaction request
333        admin.compact(tableName);
334
335        // poll wait for the compactions to happen
336        for (int i = 0; i < 10 * 1000 / 40; ++i) {
337          loc = locator.getRegionLocation(row, true);
338          try {
339            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) {
340              break;
341            }
342          } catch (Exception e) {
343            LOG.debug("Waiting for region to come online: "
344              + Bytes.toStringBinary(loc.getRegion().getRegionName()));
345          }
346          Thread.sleep(40);
347        }
348
349        // verify the compaction took place and that we didn't just time out
350        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount);
351
352        // Finally, ensure that we can remove a custom config value after we made it
353        LOG.info("Removing CF config value");
354        LOG.info("hbase.hstore.compaction.min should now be 5");
355        htd = TableDescriptorBuilder.newBuilder(htd)
356          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
357            .setValue("hbase.hstore.compaction.min", null).build())
358          .build();
359        admin.modifyTable(htd);
360        LOG.info("alter status finished");
361        assertNull(table.getDescriptor().getColumnFamily(FAMILY)
362          .getValue(Bytes.toBytes("hbase.hstore.compaction.min")));
363      }
364    }
365  }
366
367  @Test
368  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
369    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
370      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
371      List actions = (List) new ArrayList();
372      Object[] results = new Object[2];
373      // create an empty Put
374      Put put1 = new Put(ROW);
375      actions.add(put1);
376
377      Put put2 = new Put(ANOTHERROW);
378      put2.addColumn(FAMILY, QUALIFIER, VALUE);
379      actions.add(put2);
380
381      table.batch(actions, results);
382      fail("Empty Put should have failed the batch call");
383    } catch (IllegalArgumentException iae) {
384    }
385  }
386
387  // Test Table.batch with large amount of mutations against the same key.
388  // It used to trigger read lock's "Maximum lock count exceeded" Error.
389  @Test
390  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
391    int sixtyFourK = 64 * 1024;
392    List actions = new ArrayList();
393    Object[] results = new Object[(sixtyFourK + 1) * 2];
394
395    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
396      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
397
398      for (int i = 0; i < sixtyFourK + 1; i++) {
399        Put put1 = new Put(ROW);
400        put1.addColumn(FAMILY, QUALIFIER, VALUE);
401        actions.add(put1);
402
403        Put put2 = new Put(ANOTHERROW);
404        put2.addColumn(FAMILY, QUALIFIER, VALUE);
405        actions.add(put2);
406      }
407
408      table.batch(actions, results);
409    }
410  }
411
412  @Test
413  public void testBatchWithRowMutation() throws Exception {
414    LOG.info("Starting testBatchWithRowMutation");
415    byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
416
417    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
418      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
419
420      RowMutations arm = RowMutations
421        .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
422      Object[] batchResult = new Object[1];
423      table.batch(Arrays.asList(arm), batchResult);
424
425      Get g = new Get(ROW);
426      Result r = table.get(g);
427      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
428
429      arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
430        new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
431      table.batch(Arrays.asList(arm), batchResult);
432      r = table.get(g);
433      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
434      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
435
436      // Test that we get the correct remote exception for RowMutations from batch()
437      try {
438        arm = RowMutations.of(Collections.singletonList(
439          new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE)));
440        table.batch(Arrays.asList(arm), batchResult);
441        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
442      } catch (RetriesExhaustedException e) {
443        String msg = e.getMessage();
444        assertTrue(msg.contains("NoSuchColumnFamilyException"));
445      }
446    }
447  }
448
449  @Test
450  public void testBatchWithCheckAndMutate() throws Exception {
451    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
452      byte[] row1 = Bytes.toBytes("row1");
453      byte[] row2 = Bytes.toBytes("row2");
454      byte[] row3 = Bytes.toBytes("row3");
455      byte[] row4 = Bytes.toBytes("row4");
456      byte[] row5 = Bytes.toBytes("row5");
457      byte[] row6 = Bytes.toBytes("row6");
458      byte[] row7 = Bytes.toBytes("row7");
459
460      table
461        .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
462          new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
463          new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
464          new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
465          new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
466          new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
467          new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
468
469      CheckAndMutate checkAndMutate1 =
470        CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
471          .build(new RowMutations(row1)
472            .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
473            .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
474            .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
475            .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
476      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
477      RowMutations mutations =
478        new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
479          .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
480          .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
481          .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
482      CheckAndMutate checkAndMutate2 =
483        CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
484          .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
485      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
486      CheckAndMutate checkAndMutate3 =
487        CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
488          .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
489      CheckAndMutate checkAndMutate4 =
490        CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
491          .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
492
493      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
494        checkAndMutate3, checkAndMutate4);
495      Object[] results = new Object[actions.size()];
496      table.batch(actions, results);
497
498      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
499      assertTrue(checkAndMutateResult.isSuccess());
500      assertEquals(3L,
501        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
502      assertEquals("d",
503        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
504
505      assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
506
507      Result result = (Result) results[2];
508      assertTrue(result.getExists());
509      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
510      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
511
512      checkAndMutateResult = (CheckAndMutateResult) results[3];
513      assertFalse(checkAndMutateResult.isSuccess());
514      assertNull(checkAndMutateResult.getResult());
515
516      assertTrue(((Result) results[4]).isEmpty());
517
518      checkAndMutateResult = (CheckAndMutateResult) results[5];
519      assertTrue(checkAndMutateResult.isSuccess());
520      assertEquals(11,
521        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F"))));
522
523      checkAndMutateResult = (CheckAndMutateResult) results[6];
524      assertTrue(checkAndMutateResult.isSuccess());
525      assertEquals("gg",
526        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G"))));
527
528      result = table.get(new Get(row1));
529      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
530      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
531      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
532      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
533
534      result = table.get(new Get(row3));
535      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
536      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
537      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
538      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
539      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
540
541      result = table.get(new Get(row4));
542      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
543
544      result = table.get(new Get(row5));
545      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
546
547      result = table.get(new Get(row6));
548      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
549
550      result = table.get(new Get(row7));
551      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
552    }
553  }
554
555  @Test
556  public void testHTableExistsMethodSingleRegionSingleGet()
557    throws IOException, InterruptedException {
558    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
559      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
560
561      // Test with a single region table.
562      Put put = new Put(ROW);
563      put.addColumn(FAMILY, QUALIFIER, VALUE);
564
565      Get get = new Get(ROW);
566
567      boolean exist = table.exists(get);
568      assertFalse(exist);
569
570      table.put(put);
571
572      exist = table.exists(get);
573      assertTrue(exist);
574    }
575  }
576
577  @Test
578  public void testHTableExistsMethodSingleRegionMultipleGets()
579    throws IOException, InterruptedException {
580    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
581      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
582
583      Put put = new Put(ROW);
584      put.addColumn(FAMILY, QUALIFIER, VALUE);
585      table.put(put);
586
587      List<Get> gets = new ArrayList<>();
588      gets.add(new Get(ROW));
589      gets.add(new Get(ANOTHERROW));
590
591      boolean[] results = table.exists(gets);
592      assertTrue(results[0]);
593      assertFalse(results[1]);
594    }
595  }
596
597  @Test
598  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
599    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
600      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
601
602      Put put = new Put(ROW);
603      put.addColumn(FAMILY, QUALIFIER, VALUE);
604      table.put(put);
605
606      Get get = new Get(ROW);
607
608      boolean exist = table.exists(get);
609      assertEquals(true, exist);
610
611      Result result = table.get(get);
612      assertEquals(false, result.isEmpty());
613      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
614    }
615  }
616
617  @Test
618  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
619    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
620      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
621
622      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
623      Put put = new Put(ROW);
624      put.addColumn(FAMILY, QUALIFIER, VALUE);
625      table.put(put);
626      put = new Put(ROW2);
627      put.addColumn(FAMILY, QUALIFIER, VALUE);
628      table.put(put);
629
630      Get get = new Get(ROW);
631      Get get2 = new Get(ROW2);
632      ArrayList<Get> getList = new ArrayList(2);
633      getList.add(get);
634      getList.add(get2);
635
636      boolean[] exists = table.exists(getList);
637      assertEquals(true, exists[0]);
638      assertEquals(true, exists[1]);
639
640      Result[] result = table.get(getList);
641      assertEquals(false, result[0].isEmpty());
642      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
643      assertEquals(false, result[1].isEmpty());
644      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
645    }
646  }
647
648  @Test
649  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
650    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
651      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
652      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
653
654      Put put = new Put(ROW);
655      put.addColumn(FAMILY, QUALIFIER, VALUE);
656
657      Get get = new Get(ROW);
658
659      boolean exist = table.exists(get);
660      assertFalse(exist);
661
662      table.put(put);
663
664      exist = table.exists(get);
665      assertTrue(exist);
666    }
667  }
668
669  @Test
670  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
671    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
672      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
673      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
674
675      Put put = new Put(ROW);
676      put.addColumn(FAMILY, QUALIFIER, VALUE);
677      table.put(put);
678
679      List<Get> gets = new ArrayList<>();
680      gets.add(new Get(ANOTHERROW));
681      gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 })));
682      gets.add(new Get(ROW));
683      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
684
685      LOG.info("Calling exists");
686      boolean[] results = table.exists(gets);
687      assertFalse(results[0]);
688      assertFalse(results[1]);
689      assertTrue(results[2]);
690      assertFalse(results[3]);
691
692      // Test with the first region.
693      put = new Put(new byte[] { 0x00 });
694      put.addColumn(FAMILY, QUALIFIER, VALUE);
695      table.put(put);
696
697      gets = new ArrayList<>();
698      gets.add(new Get(new byte[] { 0x00 }));
699      gets.add(new Get(new byte[] { 0x00, 0x00 }));
700      results = table.exists(gets);
701      assertTrue(results[0]);
702      assertFalse(results[1]);
703
704      // Test with the last region
705      put = new Put(new byte[] { (byte) 0xff, (byte) 0xff });
706      put.addColumn(FAMILY, QUALIFIER, VALUE);
707      table.put(put);
708
709      gets = new ArrayList<>();
710      gets.add(new Get(new byte[] { (byte) 0xff }));
711      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff }));
712      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff }));
713      results = table.exists(gets);
714      assertFalse(results[0]);
715      assertTrue(results[1]);
716      assertFalse(results[2]);
717    }
718  }
719
720  @Test
721  public void testGetEmptyRow() throws Exception {
722    // Create a table and put in 1 row
723    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
724      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
725
726      Put put = new Put(ROW_BYTES);
727      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
728      table.put(put);
729
730      // Try getting the row with an empty row key
731      Result res = null;
732      try {
733        res = table.get(new Get(new byte[0]));
734        fail();
735      } catch (IllegalArgumentException e) {
736        // Expected.
737      }
738      assertTrue(res == null);
739      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
740      assertTrue(res.isEmpty() == true);
741      res = table.get(new Get(ROW_BYTES));
742      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
743    }
744  }
745
746  @Test
747  public void testConnectionDefaultUsesCodec() throws Exception {
748    try (
749      RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) {
750      assertTrue(client.hasCellBlockSupport());
751    }
752  }
753
754  @Test
755  public void testPutWithPreBatchMutate() throws Exception {
756    testPreBatchMutate(tableName, () -> {
757      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
758        Put put = new Put(ROW);
759        put.addColumn(FAMILY, QUALIFIER, VALUE);
760        t.put(put);
761      } catch (IOException ex) {
762        throw new RuntimeException(ex);
763      }
764    });
765  }
766
767  @Test
768  public void testRowMutationsWithPreBatchMutate() throws Exception {
769    testPreBatchMutate(tableName, () -> {
770      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
771        RowMutations rm = new RowMutations(ROW, 1);
772        Put put = new Put(ROW);
773        put.addColumn(FAMILY, QUALIFIER, VALUE);
774        rm.add(put);
775        t.mutateRow(rm);
776      } catch (IOException ex) {
777        throw new RuntimeException(ex);
778      }
779    });
780  }
781
782  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
783    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
784      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
785      .setCoprocessor(WaitingForScanObserver.class.getName()).build();
786    TEST_UTIL.getAdmin().createTable(tableDescriptor);
787    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
788
789    ExecutorService service = Executors.newFixedThreadPool(2);
790    service.execute(rn);
791    final List<Cell> cells = new ArrayList<>();
792    service.execute(() -> {
793      try {
794        // waiting for update.
795        TimeUnit.SECONDS.sleep(3);
796        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
797          Scan scan = new Scan();
798          try (ResultScanner scanner = t.getScanner(scan)) {
799            for (Result r : scanner) {
800              cells.addAll(Arrays.asList(r.rawCells()));
801            }
802          }
803        }
804      } catch (IOException | InterruptedException ex) {
805        throw new RuntimeException(ex);
806      }
807    });
808    service.shutdown();
809    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
810    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
811      + ", so the data is invisible to reader", 0, cells.size());
812    TEST_UTIL.deleteTable(tableName);
813  }
814
815  @Test
816  public void testLockLeakWithDelta() throws Exception, Throwable {
817    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
818      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
819      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
820      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
821    TEST_UTIL.getAdmin().createTable(tableDescriptor);
822    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
823
824    // new a connection for lower retry number.
825    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
826    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
827    try (Connection con = ConnectionFactory.createConnection(copy)) {
828      HRegion region = (HRegion) find(tableName);
829      region.setTimeoutForWriteLock(10);
830      ExecutorService putService = Executors.newSingleThreadExecutor();
831      putService.execute(() -> {
832        try (Table table = con.getTable(tableName)) {
833          Put put = new Put(ROW);
834          put.addColumn(FAMILY, QUALIFIER, VALUE);
835          // the put will be blocked by WaitingForMultiMutationsObserver.
836          table.put(put);
837        } catch (IOException ex) {
838          throw new RuntimeException(ex);
839        }
840      });
841      ExecutorService appendService = Executors.newSingleThreadExecutor();
842      appendService.execute(() -> {
843        Append append = new Append(ROW);
844        append.addColumn(FAMILY, QUALIFIER, VALUE);
845        try (Table table = con.getTable(tableName)) {
846          table.append(append);
847          fail("The APPEND should fail because the target lock is blocked by previous put");
848        } catch (Exception ex) {
849        }
850      });
851      appendService.shutdown();
852      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
853      WaitingForMultiMutationsObserver observer =
854        find(tableName, WaitingForMultiMutationsObserver.class);
855      observer.latch.countDown();
856      putService.shutdown();
857      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
858      try (Table table = con.getTable(tableName)) {
859        Result r = table.get(new Get(ROW));
860        assertFalse(r.isEmpty());
861        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
862      }
863    }
864    HRegion region = (HRegion) find(tableName);
865    int readLockCount = region.getReadLockCount();
866    LOG.info("readLockCount:" + readLockCount);
867    assertEquals(0, readLockCount);
868  }
869
870  @Test
871  public void testMultiRowMutations() throws Exception, Throwable {
872    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
873      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
874      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
875      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
876      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
877    TEST_UTIL.getAdmin().createTable(tableDescriptor);
878    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
879
880    // new a connection for lower retry number.
881    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
882    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
883    try (Connection con = ConnectionFactory.createConnection(copy)) {
884      byte[] row = Bytes.toBytes("ROW-0");
885      byte[] rowLocked = Bytes.toBytes("ROW-1");
886      byte[] value0 = Bytes.toBytes("VALUE-0");
887      byte[] value1 = Bytes.toBytes("VALUE-1");
888      byte[] value2 = Bytes.toBytes("VALUE-2");
889      assertNoLocks(tableName);
890      ExecutorService putService = Executors.newSingleThreadExecutor();
891      putService.execute(() -> {
892        try (Table table = con.getTable(tableName)) {
893          Put put0 = new Put(rowLocked);
894          put0.addColumn(FAMILY, QUALIFIER, value0);
895          // the put will be blocked by WaitingForMultiMutationsObserver.
896          table.put(put0);
897        } catch (IOException ex) {
898          throw new RuntimeException(ex);
899        }
900      });
901      ExecutorService cpService = Executors.newSingleThreadExecutor();
902      AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
903      cpService.execute(() -> {
904        Put put1 = new Put(row);
905        Put put2 = new Put(rowLocked);
906        put1.addColumn(FAMILY, QUALIFIER, value1);
907        put2.addColumn(FAMILY, QUALIFIER, value2);
908        try (Table table = con.getTable(tableName)) {
909          MultiRowMutationProtos.MutateRowsRequest request =
910            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
911              .addMutationRequest(
912                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
913              .addMutationRequest(
914                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
915              .build();
916          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
917            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
918              ServerRpcController controller = new ServerRpcController();
919              CoprocessorRpcUtils.BlockingRpcCallback<
920                MultiRowMutationProtos.MutateRowsResponse> rpcCallback =
921                  new CoprocessorRpcUtils.BlockingRpcCallback<>();
922              exe.mutateRows(controller, request, rpcCallback);
923              if (
924                controller.failedOnException()
925                  && !(controller.getFailedOn() instanceof UnknownProtocolException)
926              ) {
927                exceptionDuringMutateRows.set(true);
928              }
929              return rpcCallback.get();
930            });
931        } catch (Throwable ex) {
932          LOG.error("encountered " + ex);
933        }
934      });
935      cpService.shutdown();
936      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
937      WaitingForMultiMutationsObserver observer =
938        find(tableName, WaitingForMultiMutationsObserver.class);
939      observer.latch.countDown();
940      putService.shutdown();
941      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
942      try (Table table = con.getTable(tableName)) {
943        Get g0 = new Get(row);
944        Get g1 = new Get(rowLocked);
945        Result r0 = table.get(g0);
946        Result r1 = table.get(g1);
947        assertTrue(r0.isEmpty());
948        assertFalse(r1.isEmpty());
949        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
950      }
951      assertNoLocks(tableName);
952      if (!exceptionDuringMutateRows.get()) {
953        fail("This cp should fail because the target lock is blocked by previous put");
954      }
955    }
956  }
957
958  /**
959   * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is
960   * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that
961   * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make
962   * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch
963   * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0)
964   * are put into memstore, then a scanner with a smaller readpoint can see these data, which
965   * disobey the multi version concurrency control rules. This test case is to reproduce this
966   * scenario.
967   */
968  @Test
969  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
970    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
971      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
972      // put two row first to init the scanner
973      Put put = new Put(Bytes.toBytes("0"));
974      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
975      table.put(put);
976      put = new Put(Bytes.toBytes("00"));
977      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
978      table.put(put);
979      Scan scan = new Scan();
980      scan.setTimeRange(0, Long.MAX_VALUE);
981      scan.setCaching(1);
982      ResultScanner scanner = table.getScanner(scan);
983      int rowNum = scanner.next() != null ? 1 : 0;
984      // the started scanner shouldn't see the rows put below
985      for (int i = 1; i < 1000; i++) {
986        put = new Put(Bytes.toBytes(String.valueOf(i)));
987        put.setDurability(Durability.ASYNC_WAL);
988        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
989        table.put(put);
990      }
991      for (Result result : scanner) {
992        rowNum++;
993      }
994      // scanner should only see two rows
995      assertEquals(2, rowNum);
996      scanner = table.getScanner(scan);
997      rowNum = 0;
998      for (Result result : scanner) {
999        rowNum++;
1000      }
1001      // the new scanner should see all rows
1002      assertEquals(1001, rowNum);
1003    }
1004  }
1005
1006  @Test
1007  public void testPutThenGetWithMultipleThreads() throws Exception {
1008    final int THREAD_NUM = 20;
1009    final int ROUND_NUM = 10;
1010    for (int round = 0; round < ROUND_NUM; round++) {
1011      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1012      final AtomicInteger successCnt = new AtomicInteger(0);
1013      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1014        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1015
1016        for (int i = 0; i < THREAD_NUM; i++) {
1017          final int index = i;
1018          Thread t = new Thread(new Runnable() {
1019
1020            @Override
1021            public void run() {
1022              final byte[] row = Bytes.toBytes("row-" + index);
1023              final byte[] value = Bytes.toBytes("v" + index);
1024              try {
1025                Put put = new Put(row);
1026                put.addColumn(FAMILY, QUALIFIER, value);
1027                ht.put(put);
1028                Get get = new Get(row);
1029                Result result = ht.get(get);
1030                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1031                if (Bytes.equals(value, returnedValue)) {
1032                  successCnt.getAndIncrement();
1033                } else {
1034                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1035                    + ", returned value: "
1036                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1037                }
1038              } catch (Throwable e) {
1039                // do nothing
1040              }
1041            }
1042          });
1043          threads.add(t);
1044        }
1045        for (Thread t : threads) {
1046          t.start();
1047        }
1048        for (Thread t : threads) {
1049          t.join();
1050        }
1051        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1052      }
1053      TEST_UTIL.deleteTable(tableName);
1054    }
1055  }
1056
1057  private static void assertNoLocks(final TableName tableName)
1058    throws IOException, InterruptedException {
1059    HRegion region = (HRegion) find(tableName);
1060    assertEquals(0, region.getLockedRows().size());
1061  }
1062
1063  private static HRegion find(final TableName tableName) throws IOException, InterruptedException {
1064    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1065    List<HRegion> regions = rs.getRegions(tableName);
1066    assertEquals(1, regions.size());
1067    return regions.get(0);
1068  }
1069
1070  private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz)
1071    throws IOException, InterruptedException {
1072    HRegion region = find(tableName);
1073    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1074    assertTrue("The cp instance should be " + clz.getName() + ", current instance is "
1075      + cp.getClass().getName(), clz.isInstance(cp));
1076    return clz.cast(cp);
1077  }
1078
1079  public static class WaitingForMultiMutationsObserver
1080    implements RegionCoprocessor, RegionObserver {
1081    final CountDownLatch latch = new CountDownLatch(1);
1082
1083    @Override
1084    public Optional<RegionObserver> getRegionObserver() {
1085      return Optional.of(this);
1086    }
1087
1088    @Override
1089    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1090      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1091      try {
1092        latch.await();
1093      } catch (InterruptedException ex) {
1094        throw new IOException(ex);
1095      }
1096    }
1097  }
1098
1099  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1100    private final CountDownLatch latch = new CountDownLatch(1);
1101
1102    @Override
1103    public Optional<RegionObserver> getRegionObserver() {
1104      return Optional.of(this);
1105    }
1106
1107    @Override
1108    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1109      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1110      try {
1111        // waiting for scanner
1112        latch.await();
1113      } catch (InterruptedException ex) {
1114        throw new IOException(ex);
1115      }
1116    }
1117
1118    @Override
1119    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1120      final Scan scan, final RegionScanner s) throws IOException {
1121      latch.countDown();
1122      return s;
1123    }
1124  }
1125
1126  static byte[] generateHugeValue(int size) {
1127    Random rand = ThreadLocalRandom.current();
1128    byte[] value = new byte[size];
1129    for (int i = 0; i < value.length; i++) {
1130      value[i] = (byte) rand.nextInt(256);
1131    }
1132    return value;
1133  }
1134
1135  @Test
1136  public void testScanWithBatchSizeReturnIncompleteCells()
1137    throws IOException, InterruptedException {
1138    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1139      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1140      .build();
1141    try (Table table = TEST_UTIL.createTable(hd, null)) {
1142      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1143
1144      Put put = new Put(ROW);
1145      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1146      table.put(put);
1147
1148      put = new Put(ROW);
1149      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1150      table.put(put);
1151
1152      for (int i = 2; i < 5; i++) {
1153        for (int version = 0; version < 2; version++) {
1154          put = new Put(ROW);
1155          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1156          table.put(put);
1157        }
1158      }
1159
1160      Scan scan = new Scan();
1161      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1162        .setMaxResultSize(4 * 1024 * 1024);
1163      Result result;
1164      try (ResultScanner scanner = table.getScanner(scan)) {
1165        List<Result> list = new ArrayList<>();
1166        /*
1167         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The
1168         * second scan rpc should return a result with 3 cells, because reach the batch limit = 3;
1169         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1170         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1171         * into two result: 2+3 -> 3+2;
1172         */
1173        while ((result = scanner.next()) != null) {
1174          list.add(result);
1175        }
1176
1177        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1178        Assert.assertEquals(2, list.size());
1179        Assert.assertEquals(3, list.get(0).size());
1180        Assert.assertEquals(2, list.get(1).size());
1181      }
1182
1183      scan = new Scan();
1184      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1185        .setMaxResultSize(4 * 1024 * 1024);
1186      try (ResultScanner scanner = table.getScanner(scan)) {
1187        List<Result> list = new ArrayList<>();
1188        while ((result = scanner.next()) != null) {
1189          list.add(result);
1190        }
1191        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1192        Assert.assertEquals(3, list.size());
1193        Assert.assertEquals(2, list.get(0).size());
1194        Assert.assertEquals(2, list.get(1).size());
1195        Assert.assertEquals(1, list.get(2).size());
1196      }
1197    }
1198  }
1199}