001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.Coprocessor;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.RegionMetrics;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
052import org.apache.hadoop.hbase.coprocessor.ObserverContext;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
055import org.apache.hadoop.hbase.coprocessor.RegionObserver;
056import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.ipc.RpcClientFactory;
060import org.apache.hadoop.hbase.ipc.ServerRpcController;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.HRegionServer;
063import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
064import org.apache.hadoop.hbase.regionserver.RegionScanner;
065import org.apache.hadoop.hbase.testclassification.ClientTests;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.junit.After;
069import org.junit.AfterClass;
070import org.junit.Assert;
071import org.junit.Before;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Rule;
075import org.junit.Test;
076import org.junit.experimental.categories.Category;
077import org.junit.rules.TestName;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
084
085@Category({LargeTests.class, ClientTests.class})
086public class TestFromClientSide3 {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090      HBaseClassTestRule.forClass(TestFromClientSide3.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
093  private final static HBaseTestingUtility TEST_UTIL
094    = new HBaseTestingUtility();
095  private static final int WAITTABLE_MILLIS = 10000;
096  private static byte[] FAMILY = Bytes.toBytes("testFamily");
097  private static Random random = new Random();
098  private static int SLAVES = 3;
099  private static final byte[] ROW = Bytes.toBytes("testRow");
100  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
101  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
102  private static final byte[] VALUE = Bytes.toBytes("testValue");
103  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
104  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
105  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
106
107  @Rule
108  public TestName name = new TestName();
109  private TableName tableName;
110
111  /**
112   * @throws java.lang.Exception
113   */
114  @BeforeClass
115  public static void setUpBeforeClass() throws Exception {
116    TEST_UTIL.startMiniCluster(SLAVES);
117  }
118
119  /**
120   * @throws java.lang.Exception
121   */
122  @AfterClass
123  public static void tearDownAfterClass() throws Exception {
124    TEST_UTIL.shutdownMiniCluster();
125  }
126
127  @Before
128  public void setUp() throws Exception {
129    tableName = TableName.valueOf(name.getMethodName());
130  }
131
132  @After
133  public void tearDown() throws Exception {
134    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
135      LOG.info("Tear down, remove table=" + htd.getTableName());
136      TEST_UTIL.deleteTable(htd.getTableName());
137    }
138  }
139
140  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
141      throws Exception {
142    Put put = new Put(row);
143    for (int i = 0; i < nPuts; i++) {
144      byte[] qualifier = Bytes.toBytes(random.nextInt());
145      byte[] value = Bytes.toBytes(random.nextInt());
146      put.addColumn(family, qualifier, value);
147    }
148    table.put(put);
149  }
150
151  private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family,
152      int nFlushes, int nPuts) throws Exception {
153    for (int i = 0; i < nFlushes; i++) {
154      randomCFPuts(table, row, family, nPuts);
155      admin.flush(table.getName());
156    }
157  }
158
159  private static List<Cell> toList(ResultScanner scanner) {
160    try {
161      List<Cell> cells = new ArrayList<>();
162      for (Result r : scanner) {
163        cells.addAll(r.listCells());
164      }
165      return cells;
166    } finally {
167      scanner.close();
168    }
169  }
170
171  @Test
172  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
173    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
174      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
175      byte[] row = Bytes.toBytes("SpecifiedRow");
176      byte[] value0 = Bytes.toBytes("value_0");
177      byte[] value1 = Bytes.toBytes("value_1");
178      Put put = new Put(row);
179      put.addColumn(FAMILY, QUALIFIER, VALUE);
180      table.put(put);
181      Delete d = new Delete(row);
182      table.delete(d);
183      put = new Put(row);
184      put.addColumn(FAMILY, null, value0);
185      table.put(put);
186      put = new Put(row);
187      put.addColumn(FAMILY, null, value1);
188      table.put(put);
189      List<Cell> cells = toList(table.getScanner(new Scan()));
190      assertEquals(1, cells.size());
191      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
192
193      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
194      assertEquals(1, cells.size());
195      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
196
197      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
198      assertEquals(0, cells.size());
199
200      TEST_UTIL.getAdmin().flush(tableName);
201      cells = toList(table.getScanner(new Scan()));
202      assertEquals(1, cells.size());
203      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
204
205      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
206      assertEquals(1, cells.size());
207      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
208
209      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
210      assertEquals(0, cells.size());
211    }
212  }
213
214  @Test
215  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
216    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
217      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
218      byte[] row = Bytes.toBytes("SpecifiedRow");
219      byte[] qual0 = Bytes.toBytes("qual0");
220      byte[] qual1 = Bytes.toBytes("qual1");
221      long now = System.currentTimeMillis();
222      Delete d = new Delete(row, now);
223      table.delete(d);
224
225      Put put = new Put(row);
226      put.addColumn(FAMILY, null, now + 1, VALUE);
227      table.put(put);
228
229      put = new Put(row);
230      put.addColumn(FAMILY, qual1, now + 2, qual1);
231      table.put(put);
232
233      put = new Put(row);
234      put.addColumn(FAMILY, qual0, now + 3, qual0);
235      table.put(put);
236
237      Result r = table.get(new Get(row));
238      assertEquals(r.toString(), 3, r.size());
239      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
240      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
241      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
242
243      TEST_UTIL.getAdmin().flush(tableName);
244      r = table.get(new Get(row));
245      assertEquals(3, r.size());
246      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
247      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
248      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
249    }
250  }
251
252  private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region)
253      throws IOException {
254    for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) {
255      if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) {
256        return metrics.getStoreFileCount();
257      }
258    }
259    return 0;
260  }
261
262  // override the config settings at the CF level and ensure priority
263  @Test
264  public void testAdvancedConfigOverride() throws Exception {
265    /*
266     * Overall idea: (1) create 3 store files and issue a compaction. config's
267     * compaction.min == 3, so should work. (2) Increase the compaction.min
268     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
269     * of the default config value, adding 3 files and issuing a compaction
270     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
271     * and modify table. The CF schema should override the Table schema and now
272     * cause a minor compaction.
273     */
274    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
275
276    final TableName tableName = TableName.valueOf(name.getMethodName());
277    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
278      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
279      Admin admin = TEST_UTIL.getAdmin();
280
281      // Create 3 store files.
282      byte[] row = Bytes.toBytes(random.nextInt());
283      performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
284
285      try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
286        // Verify we have multiple store files.
287        HRegionLocation loc = locator.getRegionLocation(row, true);
288        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);
289
290        // Issue a compaction request
291        admin.compact(tableName);
292
293        // poll wait for the compactions to happen
294        for (int i = 0; i < 10 * 1000 / 40; ++i) {
295          // The number of store files after compaction should be lesser.
296          loc = locator.getRegionLocation(row, true);
297          if (!loc.getRegion().isOffline()) {
298            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) {
299              break;
300            }
301          }
302          Thread.sleep(40);
303        }
304        // verify the compactions took place and that we didn't just time out
305        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1);
306
307        // change the compaction.min config option for this table to 5
308        LOG.info("hbase.hstore.compaction.min should now be 5");
309        TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor())
310          .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build();
311        admin.modifyTable(htd);
312        LOG.info("alter status finished");
313
314        // Create 3 more store files.
315        performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10);
316
317        // Issue a compaction request
318        admin.compact(tableName);
319
320        // This time, the compaction request should not happen
321        Thread.sleep(10 * 1000);
322        loc = locator.getRegionLocation(row, true);
323        int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion());
324        assertTrue(sfCount > 1);
325
326        // change an individual CF's config option to 2 & online schema update
327        LOG.info("hbase.hstore.compaction.min should now be 2");
328        htd = TableDescriptorBuilder.newBuilder(htd)
329          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
330            .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build())
331          .build();
332        admin.modifyTable(htd);
333        LOG.info("alter status finished");
334
335        // Issue a compaction request
336        admin.compact(tableName);
337
338        // poll wait for the compactions to happen
339        for (int i = 0; i < 10 * 1000 / 40; ++i) {
340          loc = locator.getRegionLocation(row, true);
341          try {
342            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) {
343              break;
344            }
345          } catch (Exception e) {
346            LOG.debug("Waiting for region to come online: " +
347              Bytes.toStringBinary(loc.getRegion().getRegionName()));
348          }
349          Thread.sleep(40);
350        }
351
352        // verify the compaction took place and that we didn't just time out
353        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount);
354
355        // Finally, ensure that we can remove a custom config value after we made it
356        LOG.info("Removing CF config value");
357        LOG.info("hbase.hstore.compaction.min should now be 5");
358        htd = TableDescriptorBuilder.newBuilder(htd)
359          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
360            .setValue("hbase.hstore.compaction.min", null).build())
361          .build();
362        admin.modifyTable(htd);
363        LOG.info("alter status finished");
364        assertNull(table.getDescriptor().getColumnFamily(FAMILY)
365          .getValue(Bytes.toBytes("hbase.hstore.compaction.min")));
366      }
367    }
368  }
369
370  @Test
371  public void testHTableBatchWithEmptyPut () throws IOException, InterruptedException {
372    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
373      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
374      List actions = (List) new ArrayList();
375      Object[] results = new Object[2];
376      // create an empty Put
377      Put put1 = new Put(ROW);
378      actions.add(put1);
379
380      Put put2 = new Put(ANOTHERROW);
381      put2.addColumn(FAMILY, QUALIFIER, VALUE);
382      actions.add(put2);
383
384      table.batch(actions, results);
385      fail("Empty Put should have failed the batch call");
386    } catch (IllegalArgumentException iae) {
387    }
388  }
389
390  // Test Table.batch with large amount of mutations against the same key.
391  // It used to trigger read lock's "Maximum lock count exceeded" Error.
392  @Test
393  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
394    int sixtyFourK = 64 * 1024;
395    List actions = new ArrayList();
396    Object[] results = new Object[(sixtyFourK + 1) * 2];
397
398    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
399      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
400
401      for (int i = 0; i < sixtyFourK + 1; i++) {
402        Put put1 = new Put(ROW);
403        put1.addColumn(FAMILY, QUALIFIER, VALUE);
404        actions.add(put1);
405
406        Put put2 = new Put(ANOTHERROW);
407        put2.addColumn(FAMILY, QUALIFIER, VALUE);
408        actions.add(put2);
409      }
410
411      table.batch(actions, results);
412    }
413  }
414
415  @Test
416  public void testBatchWithRowMutation() throws Exception {
417    LOG.info("Starting testBatchWithRowMutation");
418    byte [][] QUALIFIERS = new byte [][] {
419      Bytes.toBytes("a"), Bytes.toBytes("b")
420    };
421
422    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
423      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
424
425      RowMutations arm = RowMutations.of(Collections.singletonList(
426              new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
427      Object[] batchResult = new Object[1];
428      table.batch(Arrays.asList(arm), batchResult);
429
430      Get g = new Get(ROW);
431      Result r = table.get(g);
432      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
433
434      arm = RowMutations.of(Arrays.asList(
435              new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
436              new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
437      table.batch(Arrays.asList(arm), batchResult);
438      r = table.get(g);
439      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
440      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
441
442      // Test that we get the correct remote exception for RowMutations from batch()
443      try {
444        arm = RowMutations.of(Collections.singletonList(
445                new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
446        table.batch(Arrays.asList(arm), batchResult);
447        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
448      } catch(RetriesExhaustedException e) {
449        String msg = e.getMessage();
450        assertTrue(msg.contains("NoSuchColumnFamilyException"));
451      }
452    }
453  }
454
455  @Test
456  public void testBatchWithCheckAndMutate() throws Exception {
457    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
458      byte[] row1 = Bytes.toBytes("row1");
459      byte[] row2 = Bytes.toBytes("row2");
460      byte[] row3 = Bytes.toBytes("row3");
461      byte[] row4 = Bytes.toBytes("row4");
462      byte[] row5 = Bytes.toBytes("row5");
463      byte[] row6 = Bytes.toBytes("row6");
464      byte[] row7 = Bytes.toBytes("row7");
465
466      table.put(Arrays.asList(
467        new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
468        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
469        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
470        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
471        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
472        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
473        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
474
475      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
476        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
477        .build(new RowMutations(row1)
478          .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
479          .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
480          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
481          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
482      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
483      RowMutations mutations = new RowMutations(row3)
484        .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
485        .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
486        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
487        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
488      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
489        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
490        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
491      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
492      CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
493        .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
494        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
495      CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
496        .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
497        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
498
499      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
500        checkAndMutate3, checkAndMutate4);
501      Object[] results = new Object[actions.size()];
502      table.batch(actions, results);
503
504      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
505      assertTrue(checkAndMutateResult.isSuccess());
506      assertEquals(3L,
507        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
508      assertEquals("d",
509        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
510
511      assertEquals("b",
512        Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
513
514      Result result = (Result) results[2];
515      assertTrue(result.getExists());
516      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
517      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
518
519      checkAndMutateResult = (CheckAndMutateResult) results[3];
520      assertFalse(checkAndMutateResult.isSuccess());
521      assertNull(checkAndMutateResult.getResult());
522
523      assertTrue(((Result) results[4]).isEmpty());
524
525      checkAndMutateResult = (CheckAndMutateResult) results[5];
526      assertTrue(checkAndMutateResult.isSuccess());
527      assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
528        .getValue(FAMILY, Bytes.toBytes("F"))));
529
530      checkAndMutateResult = (CheckAndMutateResult) results[6];
531      assertTrue(checkAndMutateResult.isSuccess());
532      assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
533        .getValue(FAMILY, Bytes.toBytes("G"))));
534
535      result = table.get(new Get(row1));
536      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
537      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
538      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
539      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
540
541      result = table.get(new Get(row3));
542      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
543      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
544      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
545      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
546      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
547
548      result = table.get(new Get(row4));
549      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
550
551      result = table.get(new Get(row5));
552      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
553
554      result = table.get(new Get(row6));
555      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
556
557      result = table.get(new Get(row7));
558      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
559    }
560  }
561
562  @Test
563  public void testHTableExistsMethodSingleRegionSingleGet()
564          throws IOException, InterruptedException {
565    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
566      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
567
568      // Test with a single region table.
569      Put put = new Put(ROW);
570      put.addColumn(FAMILY, QUALIFIER, VALUE);
571
572      Get get = new Get(ROW);
573
574      boolean exist = table.exists(get);
575      assertFalse(exist);
576
577      table.put(put);
578
579      exist = table.exists(get);
580      assertTrue(exist);
581    }
582  }
583
584  @Test
585  public void testHTableExistsMethodSingleRegionMultipleGets()
586          throws IOException, InterruptedException {
587    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
588      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
589
590      Put put = new Put(ROW);
591      put.addColumn(FAMILY, QUALIFIER, VALUE);
592      table.put(put);
593
594      List<Get> gets = new ArrayList<>();
595      gets.add(new Get(ROW));
596      gets.add(new Get(ANOTHERROW));
597
598      boolean[] results = table.exists(gets);
599      assertTrue(results[0]);
600      assertFalse(results[1]);
601    }
602  }
603
604  @Test
605  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
606    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
607      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
608
609      Put put = new Put(ROW);
610      put.addColumn(FAMILY, QUALIFIER, VALUE);
611      table.put(put);
612
613      Get get = new Get(ROW);
614
615      boolean exist = table.exists(get);
616      assertEquals(true, exist);
617
618      Result result = table.get(get);
619      assertEquals(false, result.isEmpty());
620      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
621    }
622  }
623
624  @Test
625  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
626    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
627      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
628
629      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
630      Put put = new Put(ROW);
631      put.addColumn(FAMILY, QUALIFIER, VALUE);
632      table.put(put);
633      put = new Put(ROW2);
634      put.addColumn(FAMILY, QUALIFIER, VALUE);
635      table.put(put);
636
637      Get get = new Get(ROW);
638      Get get2 = new Get(ROW2);
639      ArrayList<Get> getList = new ArrayList(2);
640      getList.add(get);
641      getList.add(get2);
642
643      boolean[] exists = table.exists(getList);
644      assertEquals(true, exists[0]);
645      assertEquals(true, exists[1]);
646
647      Result[] result = table.get(getList);
648      assertEquals(false, result[0].isEmpty());
649      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
650      assertEquals(false, result[1].isEmpty());
651      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
652    }
653  }
654
655  @Test
656  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
657    try (Table table = TEST_UTIL.createTable(
658      tableName, new byte[][] { FAMILY },
659      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
660      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
661
662      Put put = new Put(ROW);
663      put.addColumn(FAMILY, QUALIFIER, VALUE);
664
665      Get get = new Get(ROW);
666
667      boolean exist = table.exists(get);
668      assertFalse(exist);
669
670      table.put(put);
671
672      exist = table.exists(get);
673      assertTrue(exist);
674    }
675  }
676
677  @Test
678  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
679    try (Table table = TEST_UTIL.createTable(
680      tableName,
681      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
682      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
683
684      Put put = new Put(ROW);
685      put.addColumn(FAMILY, QUALIFIER, VALUE);
686      table.put(put);
687
688      List<Get> gets = new ArrayList<>();
689      gets.add(new Get(ANOTHERROW));
690      gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
691      gets.add(new Get(ROW));
692      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
693
694      LOG.info("Calling exists");
695      boolean[] results = table.exists(gets);
696      assertFalse(results[0]);
697      assertFalse(results[1]);
698      assertTrue(results[2]);
699      assertFalse(results[3]);
700
701      // Test with the first region.
702      put = new Put(new byte[]{0x00});
703      put.addColumn(FAMILY, QUALIFIER, VALUE);
704      table.put(put);
705
706      gets = new ArrayList<>();
707      gets.add(new Get(new byte[]{0x00}));
708      gets.add(new Get(new byte[]{0x00, 0x00}));
709      results = table.exists(gets);
710      assertTrue(results[0]);
711      assertFalse(results[1]);
712
713      // Test with the last region
714      put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
715      put.addColumn(FAMILY, QUALIFIER, VALUE);
716      table.put(put);
717
718      gets = new ArrayList<>();
719      gets.add(new Get(new byte[]{(byte) 0xff}));
720      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
721      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
722      results = table.exists(gets);
723      assertFalse(results[0]);
724      assertTrue(results[1]);
725      assertFalse(results[2]);
726    }
727  }
728
729  @Test
730  public void testGetEmptyRow() throws Exception {
731    //Create a table and put in 1 row
732    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
733      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
734
735      Put put = new Put(ROW_BYTES);
736      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
737      table.put(put);
738
739      //Try getting the row with an empty row key
740      Result res = null;
741      try {
742        res = table.get(new Get(new byte[0]));
743        fail();
744      } catch (IllegalArgumentException e) {
745        // Expected.
746      }
747      assertTrue(res == null);
748      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
749      assertTrue(res.isEmpty() == true);
750      res = table.get(new Get(ROW_BYTES));
751      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
752    }
753  }
754
755  @Test
756  public void testConnectionDefaultUsesCodec() throws Exception {
757    try (
758      RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) {
759      assertTrue(client.hasCellBlockSupport());
760    }
761  }
762
763  @Test
764  public void testPutWithPreBatchMutate() throws Exception {
765    testPreBatchMutate(tableName, () -> {
766      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
767        Put put = new Put(ROW);
768        put.addColumn(FAMILY, QUALIFIER, VALUE);
769        t.put(put);
770      } catch (IOException ex) {
771        throw new RuntimeException(ex);
772      }
773    });
774  }
775
776  @Test
777  public void testRowMutationsWithPreBatchMutate() throws Exception {
778    testPreBatchMutate(tableName, () -> {
779      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
780        RowMutations rm = new RowMutations(ROW, 1);
781        Put put = new Put(ROW);
782        put.addColumn(FAMILY, QUALIFIER, VALUE);
783        rm.add(put);
784        t.mutateRow(rm);
785      } catch (IOException ex) {
786        throw new RuntimeException(ex);
787      }
788    });
789  }
790
791  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
792    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
793      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
794      .setCoprocessor(WaitingForScanObserver.class.getName()).build();
795    TEST_UTIL.getAdmin().createTable(tableDescriptor);
796    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
797
798    ExecutorService service = Executors.newFixedThreadPool(2);
799    service.execute(rn);
800    final List<Cell> cells = new ArrayList<>();
801    service.execute(() -> {
802      try {
803        // waiting for update.
804        TimeUnit.SECONDS.sleep(3);
805        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
806          Scan scan = new Scan();
807          try (ResultScanner scanner = t.getScanner(scan)) {
808            for (Result r : scanner) {
809              cells.addAll(Arrays.asList(r.rawCells()));
810            }
811          }
812        }
813      } catch (IOException | InterruptedException ex) {
814        throw new RuntimeException(ex);
815      }
816    });
817    service.shutdown();
818    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
819    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
820            + ", so the data is invisible to reader", 0, cells.size());
821    TEST_UTIL.deleteTable(tableName);
822  }
823
824  @Test
825  public void testLockLeakWithDelta() throws Exception, Throwable {
826    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
827      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
828      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
829      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
830    TEST_UTIL.getAdmin().createTable(tableDescriptor);
831    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
832
833    // new a connection for lower retry number.
834    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
835    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
836    try (Connection con = ConnectionFactory.createConnection(copy)) {
837      HRegion region = (HRegion) find(tableName);
838      region.setTimeoutForWriteLock(10);
839      ExecutorService putService = Executors.newSingleThreadExecutor();
840      putService.execute(() -> {
841        try (Table table = con.getTable(tableName)) {
842          Put put = new Put(ROW);
843          put.addColumn(FAMILY, QUALIFIER, VALUE);
844          // the put will be blocked by WaitingForMultiMutationsObserver.
845          table.put(put);
846        } catch (IOException ex) {
847          throw new RuntimeException(ex);
848        }
849      });
850      ExecutorService appendService = Executors.newSingleThreadExecutor();
851      appendService.execute(() -> {
852        Append append = new Append(ROW);
853        append.addColumn(FAMILY, QUALIFIER, VALUE);
854        try (Table table = con.getTable(tableName)) {
855          table.append(append);
856          fail("The APPEND should fail because the target lock is blocked by previous put");
857        } catch (Exception ex) {
858        }
859      });
860      appendService.shutdown();
861      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
862      WaitingForMultiMutationsObserver observer =
863              find(tableName, WaitingForMultiMutationsObserver.class);
864      observer.latch.countDown();
865      putService.shutdown();
866      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
867      try (Table table = con.getTable(tableName)) {
868        Result r = table.get(new Get(ROW));
869        assertFalse(r.isEmpty());
870        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
871      }
872    }
873    HRegion region = (HRegion) find(tableName);
874    int readLockCount = region.getReadLockCount();
875    LOG.info("readLockCount:" + readLockCount);
876    assertEquals(0, readLockCount);
877  }
878
879  @Test
880  public void testMultiRowMutations() throws Exception, Throwable {
881    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
882      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
883      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
884      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
885      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
886    TEST_UTIL.getAdmin().createTable(tableDescriptor);
887    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
888
889    // new a connection for lower retry number.
890    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
891    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
892    try (Connection con = ConnectionFactory.createConnection(copy)) {
893      byte[] row = Bytes.toBytes("ROW-0");
894      byte[] rowLocked= Bytes.toBytes("ROW-1");
895      byte[] value0 = Bytes.toBytes("VALUE-0");
896      byte[] value1 = Bytes.toBytes("VALUE-1");
897      byte[] value2 = Bytes.toBytes("VALUE-2");
898      assertNoLocks(tableName);
899      ExecutorService putService = Executors.newSingleThreadExecutor();
900      putService.execute(() -> {
901        try (Table table = con.getTable(tableName)) {
902          Put put0 = new Put(rowLocked);
903          put0.addColumn(FAMILY, QUALIFIER, value0);
904          // the put will be blocked by WaitingForMultiMutationsObserver.
905          table.put(put0);
906        } catch (IOException ex) {
907          throw new RuntimeException(ex);
908        }
909      });
910      ExecutorService cpService = Executors.newSingleThreadExecutor();
911      AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
912      cpService.execute(() -> {
913        Put put1 = new Put(row);
914        Put put2 = new Put(rowLocked);
915        put1.addColumn(FAMILY, QUALIFIER, value1);
916        put2.addColumn(FAMILY, QUALIFIER, value2);
917        try (Table table = con.getTable(tableName)) {
918          MultiRowMutationProtos.MutateRowsRequest request =
919            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
920              .addMutationRequest(
921                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
922              .addMutationRequest(
923                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
924              .build();
925          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
926              ROW, ROW,
927            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
928              ServerRpcController controller = new ServerRpcController();
929              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
930                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
931              exe.mutateRows(controller, request, rpcCallback);
932              if (controller.failedOnException() &&
933                      !(controller.getFailedOn() instanceof UnknownProtocolException)) {
934                exceptionDuringMutateRows.set(true);
935              }
936              return rpcCallback.get();
937            });
938        } catch (Throwable ex) {
939          LOG.error("encountered " + ex);
940        }
941      });
942      cpService.shutdown();
943      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
944      WaitingForMultiMutationsObserver observer = find(tableName,
945          WaitingForMultiMutationsObserver.class);
946      observer.latch.countDown();
947      putService.shutdown();
948      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
949      try (Table table = con.getTable(tableName)) {
950        Get g0 = new Get(row);
951        Get g1 = new Get(rowLocked);
952        Result r0 = table.get(g0);
953        Result r1 = table.get(g1);
954        assertTrue(r0.isEmpty());
955        assertFalse(r1.isEmpty());
956        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
957      }
958      assertNoLocks(tableName);
959      if (!exceptionDuringMutateRows.get()) {
960        fail("This cp should fail because the target lock is blocked by previous put");
961      }
962    }
963  }
964
965  /**
966   * A test case for issue HBASE-17482
967   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
968   * onto cells in the append thread, a countdown latch is used to ensure that happened
969   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
970   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
971   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
972   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
973   * with a smaller readpoint can see these data, which disobey the multi version
974   * concurrency control rules.
975   * This test case is to reproduce this scenario.
976   * @throws IOException
977   */
978  @Test
979  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
980    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
981      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
982      //put two row first to init the scanner
983      Put put = new Put(Bytes.toBytes("0"));
984      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
985      table.put(put);
986      put = new Put(Bytes.toBytes("00"));
987      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
988      table.put(put);
989      Scan scan = new Scan();
990      scan.setTimeRange(0, Long.MAX_VALUE);
991      scan.setCaching(1);
992      ResultScanner scanner = table.getScanner(scan);
993      int rowNum = scanner.next() != null ? 1 : 0;
994      //the started scanner shouldn't see the rows put below
995      for (int i = 1; i < 1000; i++) {
996        put = new Put(Bytes.toBytes(String.valueOf(i)));
997        put.setDurability(Durability.ASYNC_WAL);
998        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
999        table.put(put);
1000      }
1001      for (Result result : scanner) {
1002        rowNum++;
1003      }
1004      //scanner should only see two rows
1005      assertEquals(2, rowNum);
1006      scanner = table.getScanner(scan);
1007      rowNum = 0;
1008      for (Result result : scanner) {
1009        rowNum++;
1010      }
1011      // the new scanner should see all rows
1012      assertEquals(1001, rowNum);
1013    }
1014  }
1015
1016  @Test
1017  public void testPutThenGetWithMultipleThreads() throws Exception {
1018    final int THREAD_NUM = 20;
1019    final int ROUND_NUM = 10;
1020    for (int round = 0; round < ROUND_NUM; round++) {
1021      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1022      final AtomicInteger successCnt = new AtomicInteger(0);
1023      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1024        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1025
1026        for (int i = 0; i < THREAD_NUM; i++) {
1027          final int index = i;
1028          Thread t = new Thread(new Runnable() {
1029
1030            @Override
1031            public void run() {
1032              final byte[] row = Bytes.toBytes("row-" + index);
1033              final byte[] value = Bytes.toBytes("v" + index);
1034              try {
1035                Put put = new Put(row);
1036                put.addColumn(FAMILY, QUALIFIER, value);
1037                ht.put(put);
1038                Get get = new Get(row);
1039                Result result = ht.get(get);
1040                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1041                if (Bytes.equals(value, returnedValue)) {
1042                  successCnt.getAndIncrement();
1043                } else {
1044                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1045                          + ", returned value: "
1046                          + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1047                }
1048              } catch (Throwable e) {
1049                // do nothing
1050              }
1051            }
1052          });
1053          threads.add(t);
1054        }
1055        for (Thread t : threads) {
1056          t.start();
1057        }
1058        for (Thread t : threads) {
1059          t.join();
1060        }
1061        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1062      }
1063      TEST_UTIL.deleteTable(tableName);
1064    }
1065  }
1066
1067  private static void assertNoLocks(final TableName tableName)
1068          throws IOException, InterruptedException {
1069    HRegion region = (HRegion) find(tableName);
1070    assertEquals(0, region.getLockedRows().size());
1071  }
1072  private static HRegion find(final TableName tableName)
1073      throws IOException, InterruptedException {
1074    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1075    List<HRegion> regions = rs.getRegions(tableName);
1076    assertEquals(1, regions.size());
1077    return regions.get(0);
1078  }
1079
1080  private static <T extends RegionObserver> T find(final TableName tableName,
1081          Class<T> clz) throws IOException, InterruptedException {
1082    HRegion region = find(tableName);
1083    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1084    assertTrue("The cp instance should be " + clz.getName()
1085            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1086    return clz.cast(cp);
1087  }
1088
1089  public static class WaitingForMultiMutationsObserver
1090      implements RegionCoprocessor, RegionObserver {
1091    final CountDownLatch latch = new CountDownLatch(1);
1092
1093    @Override
1094    public Optional<RegionObserver> getRegionObserver() {
1095      return Optional.of(this);
1096    }
1097
1098    @Override
1099    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1100            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1101      try {
1102        latch.await();
1103      } catch (InterruptedException ex) {
1104        throw new IOException(ex);
1105      }
1106    }
1107  }
1108
1109  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1110    private final CountDownLatch latch = new CountDownLatch(1);
1111
1112    @Override
1113    public Optional<RegionObserver> getRegionObserver() {
1114      return Optional.of(this);
1115    }
1116
1117    @Override
1118    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1119            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1120      try {
1121        // waiting for scanner
1122        latch.await();
1123      } catch (InterruptedException ex) {
1124        throw new IOException(ex);
1125      }
1126    }
1127
1128    @Override
1129    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1130            final Scan scan, final RegionScanner s) throws IOException {
1131      latch.countDown();
1132      return s;
1133    }
1134  }
1135
1136  static byte[] generateHugeValue(int size) {
1137    Random rand = ThreadLocalRandom.current();
1138    byte[] value = new byte[size];
1139    for (int i = 0; i < value.length; i++) {
1140      value[i] = (byte) rand.nextInt(256);
1141    }
1142    return value;
1143  }
1144
1145  @Test
1146  public void testScanWithBatchSizeReturnIncompleteCells() throws IOException, InterruptedException {
1147    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1148            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1149            .build();
1150    try (Table table = TEST_UTIL.createTable(hd, null)) {
1151      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1152
1153      Put put = new Put(ROW);
1154      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1155      table.put(put);
1156
1157      put = new Put(ROW);
1158      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1159      table.put(put);
1160
1161      for (int i = 2; i < 5; i++) {
1162        for (int version = 0; version < 2; version++) {
1163          put = new Put(ROW);
1164          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1165          table.put(put);
1166        }
1167      }
1168
1169      Scan scan = new Scan();
1170      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1171              .setMaxResultSize(4 * 1024 * 1024);
1172      Result result;
1173      try (ResultScanner scanner = table.getScanner(scan)) {
1174        List<Result> list = new ArrayList<>();
1175        /*
1176         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
1177         * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
1178         * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1179         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1180         * into two result: 2+3 -> 3+2;
1181         */
1182        while ((result = scanner.next()) != null) {
1183          list.add(result);
1184        }
1185
1186        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1187        Assert.assertEquals(2, list.size());
1188        Assert.assertEquals(3, list.get(0).size());
1189        Assert.assertEquals(2, list.get(1).size());
1190      }
1191
1192      scan = new Scan();
1193      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1194              .setMaxResultSize(4 * 1024 * 1024);
1195      try (ResultScanner scanner = table.getScanner(scan)) {
1196        List<Result> list = new ArrayList<>();
1197        while ((result = scanner.next()) != null) {
1198          list.add(result);
1199        }
1200        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1201        Assert.assertEquals(3, list.size());
1202        Assert.assertEquals(2, list.get(0).size());
1203        Assert.assertEquals(2, list.get(1).size());
1204        Assert.assertEquals(1, list.get(2).size());
1205      }
1206    }
1207  }
1208}