001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.Coprocessor;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.RegionMetrics;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
052import org.apache.hadoop.hbase.coprocessor.ObserverContext;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
055import org.apache.hadoop.hbase.coprocessor.RegionObserver;
056import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.ipc.RpcClientFactory;
060import org.apache.hadoop.hbase.ipc.ServerRpcController;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.HRegionServer;
063import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
064import org.apache.hadoop.hbase.regionserver.RegionScanner;
065import org.apache.hadoop.hbase.testclassification.ClientTests;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.junit.After;
070import org.junit.AfterClass;
071import org.junit.Assert;
072import org.junit.Before;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
085
086@Category({LargeTests.class, ClientTests.class})
087public class TestFromClientSide3 {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091      HBaseClassTestRule.forClass(TestFromClientSide3.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
094  private final static HBaseTestingUtil TEST_UTIL
095    = new HBaseTestingUtil();
096  private static final int WAITTABLE_MILLIS = 10000;
097  private static byte[] FAMILY = Bytes.toBytes("testFamily");
098  private static Random random = new Random();
099  private static int SLAVES = 3;
100  private static final byte[] ROW = Bytes.toBytes("testRow");
101  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
102  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
103  private static final byte[] VALUE = Bytes.toBytes("testValue");
104  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
105  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
106  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
107
108  @Rule
109  public TestName name = new TestName();
110  private TableName tableName;
111
112  /**
113   * @throws java.lang.Exception
114   */
115  @BeforeClass
116  public static void setUpBeforeClass() throws Exception {
117    TEST_UTIL.startMiniCluster(SLAVES);
118  }
119
120  /**
121   * @throws java.lang.Exception
122   */
123  @AfterClass
124  public static void tearDownAfterClass() throws Exception {
125    TEST_UTIL.shutdownMiniCluster();
126  }
127
128  @Before
129  public void setUp() throws Exception {
130    tableName = TableName.valueOf(name.getMethodName());
131  }
132
133  @After
134  public void tearDown() throws Exception {
135    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
136      LOG.info("Tear down, remove table=" + htd.getTableName());
137      TEST_UTIL.deleteTable(htd.getTableName());
138    }
139  }
140
141  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
142      throws Exception {
143    Put put = new Put(row);
144    for (int i = 0; i < nPuts; i++) {
145      byte[] qualifier = Bytes.toBytes(random.nextInt());
146      byte[] value = Bytes.toBytes(random.nextInt());
147      put.addColumn(family, qualifier, value);
148    }
149    table.put(put);
150  }
151
152  private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family,
153      int nFlushes, int nPuts) throws Exception {
154    for (int i = 0; i < nFlushes; i++) {
155      randomCFPuts(table, row, family, nPuts);
156      admin.flush(table.getName());
157    }
158  }
159
160  private static List<Cell> toList(ResultScanner scanner) {
161    try {
162      List<Cell> cells = new ArrayList<>();
163      for (Result r : scanner) {
164        cells.addAll(r.listCells());
165      }
166      return cells;
167    } finally {
168      scanner.close();
169    }
170  }
171
172  @Test
173  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
174    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
175      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
176      byte[] row = Bytes.toBytes("SpecifiedRow");
177      byte[] value0 = Bytes.toBytes("value_0");
178      byte[] value1 = Bytes.toBytes("value_1");
179      Put put = new Put(row);
180      put.addColumn(FAMILY, QUALIFIER, VALUE);
181      table.put(put);
182      Delete d = new Delete(row);
183      table.delete(d);
184      put = new Put(row);
185      put.addColumn(FAMILY, null, value0);
186      table.put(put);
187      put = new Put(row);
188      put.addColumn(FAMILY, null, value1);
189      table.put(put);
190      List<Cell> cells = toList(table.getScanner(new Scan()));
191      assertEquals(1, cells.size());
192      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
193
194      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
195      assertEquals(1, cells.size());
196      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
197
198      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
199      assertEquals(0, cells.size());
200
201      TEST_UTIL.getAdmin().flush(tableName);
202      cells = toList(table.getScanner(new Scan()));
203      assertEquals(1, cells.size());
204      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
205
206      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
207      assertEquals(1, cells.size());
208      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
209
210      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
211      assertEquals(0, cells.size());
212    }
213  }
214
215  @Test
216  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
217    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
218      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
219      byte[] row = Bytes.toBytes("SpecifiedRow");
220      byte[] qual0 = Bytes.toBytes("qual0");
221      byte[] qual1 = Bytes.toBytes("qual1");
222      long now = EnvironmentEdgeManager.currentTime();
223      Delete d = new Delete(row, now);
224      table.delete(d);
225
226      Put put = new Put(row);
227      put.addColumn(FAMILY, null, now + 1, VALUE);
228      table.put(put);
229
230      put = new Put(row);
231      put.addColumn(FAMILY, qual1, now + 2, qual1);
232      table.put(put);
233
234      put = new Put(row);
235      put.addColumn(FAMILY, qual0, now + 3, qual0);
236      table.put(put);
237
238      Result r = table.get(new Get(row));
239      assertEquals(r.toString(), 3, r.size());
240      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
241      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
242      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
243
244      TEST_UTIL.getAdmin().flush(tableName);
245      r = table.get(new Get(row));
246      assertEquals(3, r.size());
247      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
248      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
249      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
250    }
251  }
252
253  private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region)
254      throws IOException {
255    for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) {
256      if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) {
257        return metrics.getStoreFileCount();
258      }
259    }
260    return 0;
261  }
262
263  // override the config settings at the CF level and ensure priority
264  @Test
265  public void testAdvancedConfigOverride() throws Exception {
266    /*
267     * Overall idea: (1) create 3 store files and issue a compaction. config's
268     * compaction.min == 3, so should work. (2) Increase the compaction.min
269     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
270     * of the default config value, adding 3 files and issuing a compaction
271     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
272     * and modify table. The CF schema should override the Table schema and now
273     * cause a minor compaction.
274     */
275    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
276
277    final TableName tableName = TableName.valueOf(name.getMethodName());
278    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
279      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
280      Admin admin = TEST_UTIL.getAdmin();
281
282      // Create 3 store files.
283      byte[] row = Bytes.toBytes(random.nextInt());
284      performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
285
286      try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
287        // Verify we have multiple store files.
288        HRegionLocation loc = locator.getRegionLocation(row, true);
289        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);
290
291        // Issue a compaction request
292        admin.compact(tableName);
293
294        // poll wait for the compactions to happen
295        for (int i = 0; i < 10 * 1000 / 40; ++i) {
296          // The number of store files after compaction should be lesser.
297          loc = locator.getRegionLocation(row, true);
298          if (!loc.getRegion().isOffline()) {
299            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) {
300              break;
301            }
302          }
303          Thread.sleep(40);
304        }
305        // verify the compactions took place and that we didn't just time out
306        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1);
307
308        // change the compaction.min config option for this table to 5
309        LOG.info("hbase.hstore.compaction.min should now be 5");
310        TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor())
311          .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build();
312        admin.modifyTable(htd);
313        LOG.info("alter status finished");
314
315        // Create 3 more store files.
316        performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10);
317
318        // Issue a compaction request
319        admin.compact(tableName);
320
321        // This time, the compaction request should not happen
322        Thread.sleep(10 * 1000);
323        loc = locator.getRegionLocation(row, true);
324        int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion());
325        assertTrue(sfCount > 1);
326
327        // change an individual CF's config option to 2 & online schema update
328        LOG.info("hbase.hstore.compaction.min should now be 2");
329        htd = TableDescriptorBuilder.newBuilder(htd)
330          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
331            .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build())
332          .build();
333        admin.modifyTable(htd);
334        LOG.info("alter status finished");
335
336        // Issue a compaction request
337        admin.compact(tableName);
338
339        // poll wait for the compactions to happen
340        for (int i = 0; i < 10 * 1000 / 40; ++i) {
341          loc = locator.getRegionLocation(row, true);
342          try {
343            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) {
344              break;
345            }
346          } catch (Exception e) {
347            LOG.debug("Waiting for region to come online: " +
348              Bytes.toStringBinary(loc.getRegion().getRegionName()));
349          }
350          Thread.sleep(40);
351        }
352
353        // verify the compaction took place and that we didn't just time out
354        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount);
355
356        // Finally, ensure that we can remove a custom config value after we made it
357        LOG.info("Removing CF config value");
358        LOG.info("hbase.hstore.compaction.min should now be 5");
359        htd = TableDescriptorBuilder.newBuilder(htd)
360          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
361            .setValue("hbase.hstore.compaction.min", null).build())
362          .build();
363        admin.modifyTable(htd);
364        LOG.info("alter status finished");
365        assertNull(table.getDescriptor().getColumnFamily(FAMILY)
366          .getValue(Bytes.toBytes("hbase.hstore.compaction.min")));
367      }
368    }
369  }
370
371  @Test
372  public void testHTableBatchWithEmptyPut () throws IOException, InterruptedException {
373    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
374      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
375      List actions = (List) new ArrayList();
376      Object[] results = new Object[2];
377      // create an empty Put
378      Put put1 = new Put(ROW);
379      actions.add(put1);
380
381      Put put2 = new Put(ANOTHERROW);
382      put2.addColumn(FAMILY, QUALIFIER, VALUE);
383      actions.add(put2);
384
385      table.batch(actions, results);
386      fail("Empty Put should have failed the batch call");
387    } catch (IllegalArgumentException iae) {
388    }
389  }
390
391  // Test Table.batch with large amount of mutations against the same key.
392  // It used to trigger read lock's "Maximum lock count exceeded" Error.
393  @Test
394  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
395    int sixtyFourK = 64 * 1024;
396    List actions = new ArrayList();
397    Object[] results = new Object[(sixtyFourK + 1) * 2];
398
399    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
400      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
401
402      for (int i = 0; i < sixtyFourK + 1; i++) {
403        Put put1 = new Put(ROW);
404        put1.addColumn(FAMILY, QUALIFIER, VALUE);
405        actions.add(put1);
406
407        Put put2 = new Put(ANOTHERROW);
408        put2.addColumn(FAMILY, QUALIFIER, VALUE);
409        actions.add(put2);
410      }
411
412      table.batch(actions, results);
413    }
414  }
415
416  @Test
417  public void testBatchWithRowMutation() throws Exception {
418    LOG.info("Starting testBatchWithRowMutation");
419    byte [][] QUALIFIERS = new byte [][] {
420      Bytes.toBytes("a"), Bytes.toBytes("b")
421    };
422
423    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
424      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
425
426      RowMutations arm = RowMutations.of(Collections.singletonList(
427              new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
428      Object[] batchResult = new Object[1];
429      table.batch(Arrays.asList(arm), batchResult);
430
431      Get g = new Get(ROW);
432      Result r = table.get(g);
433      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
434
435      arm = RowMutations.of(Arrays.asList(
436              new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
437              new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
438      table.batch(Arrays.asList(arm), batchResult);
439      r = table.get(g);
440      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
441      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
442
443      // Test that we get the correct remote exception for RowMutations from batch()
444      try {
445        arm = RowMutations.of(Collections.singletonList(
446                new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
447        table.batch(Arrays.asList(arm), batchResult);
448        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
449      } catch(RetriesExhaustedException e) {
450        String msg = e.getMessage();
451        assertTrue(msg.contains("NoSuchColumnFamilyException"));
452      }
453    }
454  }
455
456  @Test
457  public void testBatchWithCheckAndMutate() throws Exception {
458    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
459      byte[] row1 = Bytes.toBytes("row1");
460      byte[] row2 = Bytes.toBytes("row2");
461      byte[] row3 = Bytes.toBytes("row3");
462      byte[] row4 = Bytes.toBytes("row4");
463      byte[] row5 = Bytes.toBytes("row5");
464      byte[] row6 = Bytes.toBytes("row6");
465      byte[] row7 = Bytes.toBytes("row7");
466
467      table.put(Arrays.asList(
468        new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
469        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
470        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
471        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
472        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
473        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
474        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
475
476      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
477        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
478        .build(new RowMutations(row1)
479          .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
480          .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
481          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
482          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
483      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
484      RowMutations mutations = new RowMutations(row3)
485        .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
486        .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
487        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
488        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
489      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
490        .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
491        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
492      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
493      CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
494        .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
495        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
496      CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
497        .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
498        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
499
500      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
501        checkAndMutate3, checkAndMutate4);
502      Object[] results = new Object[actions.size()];
503      table.batch(actions, results);
504
505      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
506      assertTrue(checkAndMutateResult.isSuccess());
507      assertEquals(3L,
508        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
509      assertEquals("d",
510        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
511
512      assertEquals("b",
513        Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
514
515      Result result = (Result) results[2];
516      assertTrue(result.getExists());
517      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
518      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
519
520      checkAndMutateResult = (CheckAndMutateResult) results[3];
521      assertFalse(checkAndMutateResult.isSuccess());
522      assertNull(checkAndMutateResult.getResult());
523
524      assertTrue(((Result) results[4]).isEmpty());
525
526      checkAndMutateResult = (CheckAndMutateResult) results[5];
527      assertTrue(checkAndMutateResult.isSuccess());
528      assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
529        .getValue(FAMILY, Bytes.toBytes("F"))));
530
531      checkAndMutateResult = (CheckAndMutateResult) results[6];
532      assertTrue(checkAndMutateResult.isSuccess());
533      assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
534        .getValue(FAMILY, Bytes.toBytes("G"))));
535
536      result = table.get(new Get(row1));
537      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
538      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
539      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
540      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
541
542      result = table.get(new Get(row3));
543      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
544      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
545      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
546      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
547      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
548
549      result = table.get(new Get(row4));
550      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
551
552      result = table.get(new Get(row5));
553      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
554
555      result = table.get(new Get(row6));
556      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
557
558      result = table.get(new Get(row7));
559      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
560    }
561  }
562
563  @Test
564  public void testHTableExistsMethodSingleRegionSingleGet()
565          throws IOException, InterruptedException {
566    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
567      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
568
569      // Test with a single region table.
570      Put put = new Put(ROW);
571      put.addColumn(FAMILY, QUALIFIER, VALUE);
572
573      Get get = new Get(ROW);
574
575      boolean exist = table.exists(get);
576      assertFalse(exist);
577
578      table.put(put);
579
580      exist = table.exists(get);
581      assertTrue(exist);
582    }
583  }
584
585  @Test
586  public void testHTableExistsMethodSingleRegionMultipleGets()
587          throws IOException, InterruptedException {
588    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
589      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
590
591      Put put = new Put(ROW);
592      put.addColumn(FAMILY, QUALIFIER, VALUE);
593      table.put(put);
594
595      List<Get> gets = new ArrayList<>();
596      gets.add(new Get(ROW));
597      gets.add(new Get(ANOTHERROW));
598
599      boolean[] results = table.exists(gets);
600      assertTrue(results[0]);
601      assertFalse(results[1]);
602    }
603  }
604
605  @Test
606  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
607    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
608      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
609
610      Put put = new Put(ROW);
611      put.addColumn(FAMILY, QUALIFIER, VALUE);
612      table.put(put);
613
614      Get get = new Get(ROW);
615
616      boolean exist = table.exists(get);
617      assertEquals(true, exist);
618
619      Result result = table.get(get);
620      assertEquals(false, result.isEmpty());
621      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
622    }
623  }
624
625  @Test
626  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
627    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
628      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
629
630      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
631      Put put = new Put(ROW);
632      put.addColumn(FAMILY, QUALIFIER, VALUE);
633      table.put(put);
634      put = new Put(ROW2);
635      put.addColumn(FAMILY, QUALIFIER, VALUE);
636      table.put(put);
637
638      Get get = new Get(ROW);
639      Get get2 = new Get(ROW2);
640      ArrayList<Get> getList = new ArrayList(2);
641      getList.add(get);
642      getList.add(get2);
643
644      boolean[] exists = table.exists(getList);
645      assertEquals(true, exists[0]);
646      assertEquals(true, exists[1]);
647
648      Result[] result = table.get(getList);
649      assertEquals(false, result[0].isEmpty());
650      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
651      assertEquals(false, result[1].isEmpty());
652      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
653    }
654  }
655
656  @Test
657  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
658    try (Table table = TEST_UTIL.createTable(
659      tableName, new byte[][] { FAMILY },
660      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
661      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
662
663      Put put = new Put(ROW);
664      put.addColumn(FAMILY, QUALIFIER, VALUE);
665
666      Get get = new Get(ROW);
667
668      boolean exist = table.exists(get);
669      assertFalse(exist);
670
671      table.put(put);
672
673      exist = table.exists(get);
674      assertTrue(exist);
675    }
676  }
677
678  @Test
679  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
680    try (Table table = TEST_UTIL.createTable(
681      tableName,
682      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
683      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
684
685      Put put = new Put(ROW);
686      put.addColumn(FAMILY, QUALIFIER, VALUE);
687      table.put(put);
688
689      List<Get> gets = new ArrayList<>();
690      gets.add(new Get(ANOTHERROW));
691      gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
692      gets.add(new Get(ROW));
693      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
694
695      LOG.info("Calling exists");
696      boolean[] results = table.exists(gets);
697      assertFalse(results[0]);
698      assertFalse(results[1]);
699      assertTrue(results[2]);
700      assertFalse(results[3]);
701
702      // Test with the first region.
703      put = new Put(new byte[]{0x00});
704      put.addColumn(FAMILY, QUALIFIER, VALUE);
705      table.put(put);
706
707      gets = new ArrayList<>();
708      gets.add(new Get(new byte[]{0x00}));
709      gets.add(new Get(new byte[]{0x00, 0x00}));
710      results = table.exists(gets);
711      assertTrue(results[0]);
712      assertFalse(results[1]);
713
714      // Test with the last region
715      put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
716      put.addColumn(FAMILY, QUALIFIER, VALUE);
717      table.put(put);
718
719      gets = new ArrayList<>();
720      gets.add(new Get(new byte[]{(byte) 0xff}));
721      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
722      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
723      results = table.exists(gets);
724      assertFalse(results[0]);
725      assertTrue(results[1]);
726      assertFalse(results[2]);
727    }
728  }
729
730  @Test
731  public void testGetEmptyRow() throws Exception {
732    //Create a table and put in 1 row
733    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
734      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
735
736      Put put = new Put(ROW_BYTES);
737      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
738      table.put(put);
739
740      //Try getting the row with an empty row key
741      Result res = null;
742      try {
743        res = table.get(new Get(new byte[0]));
744        fail();
745      } catch (IllegalArgumentException e) {
746        // Expected.
747      }
748      assertTrue(res == null);
749      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
750      assertTrue(res.isEmpty() == true);
751      res = table.get(new Get(ROW_BYTES));
752      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
753    }
754  }
755
756  @Test
757  public void testConnectionDefaultUsesCodec() throws Exception {
758    try (
759      RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) {
760      assertTrue(client.hasCellBlockSupport());
761    }
762  }
763
764  @Test
765  public void testPutWithPreBatchMutate() throws Exception {
766    testPreBatchMutate(tableName, () -> {
767      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
768        Put put = new Put(ROW);
769        put.addColumn(FAMILY, QUALIFIER, VALUE);
770        t.put(put);
771      } catch (IOException ex) {
772        throw new RuntimeException(ex);
773      }
774    });
775  }
776
777  @Test
778  public void testRowMutationsWithPreBatchMutate() throws Exception {
779    testPreBatchMutate(tableName, () -> {
780      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
781        RowMutations rm = new RowMutations(ROW, 1);
782        Put put = new Put(ROW);
783        put.addColumn(FAMILY, QUALIFIER, VALUE);
784        rm.add(put);
785        t.mutateRow(rm);
786      } catch (IOException ex) {
787        throw new RuntimeException(ex);
788      }
789    });
790  }
791
792  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
793    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
794      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
795      .setCoprocessor(WaitingForScanObserver.class.getName()).build();
796    TEST_UTIL.getAdmin().createTable(tableDescriptor);
797    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
798
799    ExecutorService service = Executors.newFixedThreadPool(2);
800    service.execute(rn);
801    final List<Cell> cells = new ArrayList<>();
802    service.execute(() -> {
803      try {
804        // waiting for update.
805        TimeUnit.SECONDS.sleep(3);
806        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
807          Scan scan = new Scan();
808          try (ResultScanner scanner = t.getScanner(scan)) {
809            for (Result r : scanner) {
810              cells.addAll(Arrays.asList(r.rawCells()));
811            }
812          }
813        }
814      } catch (IOException | InterruptedException ex) {
815        throw new RuntimeException(ex);
816      }
817    });
818    service.shutdown();
819    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
820    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
821            + ", so the data is invisible to reader", 0, cells.size());
822    TEST_UTIL.deleteTable(tableName);
823  }
824
825  @Test
826  public void testLockLeakWithDelta() throws Exception, Throwable {
827    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
828      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
829      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
830      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
831    TEST_UTIL.getAdmin().createTable(tableDescriptor);
832    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
833
834    // new a connection for lower retry number.
835    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
836    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
837    try (Connection con = ConnectionFactory.createConnection(copy)) {
838      HRegion region = (HRegion) find(tableName);
839      region.setTimeoutForWriteLock(10);
840      ExecutorService putService = Executors.newSingleThreadExecutor();
841      putService.execute(() -> {
842        try (Table table = con.getTable(tableName)) {
843          Put put = new Put(ROW);
844          put.addColumn(FAMILY, QUALIFIER, VALUE);
845          // the put will be blocked by WaitingForMultiMutationsObserver.
846          table.put(put);
847        } catch (IOException ex) {
848          throw new RuntimeException(ex);
849        }
850      });
851      ExecutorService appendService = Executors.newSingleThreadExecutor();
852      appendService.execute(() -> {
853        Append append = new Append(ROW);
854        append.addColumn(FAMILY, QUALIFIER, VALUE);
855        try (Table table = con.getTable(tableName)) {
856          table.append(append);
857          fail("The APPEND should fail because the target lock is blocked by previous put");
858        } catch (Exception ex) {
859        }
860      });
861      appendService.shutdown();
862      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
863      WaitingForMultiMutationsObserver observer =
864              find(tableName, WaitingForMultiMutationsObserver.class);
865      observer.latch.countDown();
866      putService.shutdown();
867      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
868      try (Table table = con.getTable(tableName)) {
869        Result r = table.get(new Get(ROW));
870        assertFalse(r.isEmpty());
871        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
872      }
873    }
874    HRegion region = (HRegion) find(tableName);
875    int readLockCount = region.getReadLockCount();
876    LOG.info("readLockCount:" + readLockCount);
877    assertEquals(0, readLockCount);
878  }
879
880  @Test
881  public void testMultiRowMutations() throws Exception, Throwable {
882    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
883      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
884      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
885      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
886      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
887    TEST_UTIL.getAdmin().createTable(tableDescriptor);
888    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
889
890    // new a connection for lower retry number.
891    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
892    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
893    try (Connection con = ConnectionFactory.createConnection(copy)) {
894      byte[] row = Bytes.toBytes("ROW-0");
895      byte[] rowLocked= Bytes.toBytes("ROW-1");
896      byte[] value0 = Bytes.toBytes("VALUE-0");
897      byte[] value1 = Bytes.toBytes("VALUE-1");
898      byte[] value2 = Bytes.toBytes("VALUE-2");
899      assertNoLocks(tableName);
900      ExecutorService putService = Executors.newSingleThreadExecutor();
901      putService.execute(() -> {
902        try (Table table = con.getTable(tableName)) {
903          Put put0 = new Put(rowLocked);
904          put0.addColumn(FAMILY, QUALIFIER, value0);
905          // the put will be blocked by WaitingForMultiMutationsObserver.
906          table.put(put0);
907        } catch (IOException ex) {
908          throw new RuntimeException(ex);
909        }
910      });
911      ExecutorService cpService = Executors.newSingleThreadExecutor();
912      AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
913      cpService.execute(() -> {
914        Put put1 = new Put(row);
915        Put put2 = new Put(rowLocked);
916        put1.addColumn(FAMILY, QUALIFIER, value1);
917        put2.addColumn(FAMILY, QUALIFIER, value2);
918        try (Table table = con.getTable(tableName)) {
919          MultiRowMutationProtos.MutateRowsRequest request =
920            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
921              .addMutationRequest(
922                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
923              .addMutationRequest(
924                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
925              .build();
926          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
927              ROW, ROW,
928            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
929              ServerRpcController controller = new ServerRpcController();
930              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
931                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
932              exe.mutateRows(controller, request, rpcCallback);
933              if (controller.failedOnException() &&
934                      !(controller.getFailedOn() instanceof UnknownProtocolException)) {
935                exceptionDuringMutateRows.set(true);
936              }
937              return rpcCallback.get();
938            });
939        } catch (Throwable ex) {
940          LOG.error("encountered " + ex);
941        }
942      });
943      cpService.shutdown();
944      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
945      WaitingForMultiMutationsObserver observer = find(tableName,
946          WaitingForMultiMutationsObserver.class);
947      observer.latch.countDown();
948      putService.shutdown();
949      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
950      try (Table table = con.getTable(tableName)) {
951        Get g0 = new Get(row);
952        Get g1 = new Get(rowLocked);
953        Result r0 = table.get(g0);
954        Result r1 = table.get(g1);
955        assertTrue(r0.isEmpty());
956        assertFalse(r1.isEmpty());
957        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
958      }
959      assertNoLocks(tableName);
960      if (!exceptionDuringMutateRows.get()) {
961        fail("This cp should fail because the target lock is blocked by previous put");
962      }
963    }
964  }
965
966  /**
967   * A test case for issue HBASE-17482
968   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
969   * onto cells in the append thread, a countdown latch is used to ensure that happened
970   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
971   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
972   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
973   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
974   * with a smaller readpoint can see these data, which disobey the multi version
975   * concurrency control rules.
976   * This test case is to reproduce this scenario.
977   * @throws IOException
978   */
979  @Test
980  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
981    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
982      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
983      //put two row first to init the scanner
984      Put put = new Put(Bytes.toBytes("0"));
985      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
986      table.put(put);
987      put = new Put(Bytes.toBytes("00"));
988      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
989      table.put(put);
990      Scan scan = new Scan();
991      scan.setTimeRange(0, Long.MAX_VALUE);
992      scan.setCaching(1);
993      ResultScanner scanner = table.getScanner(scan);
994      int rowNum = scanner.next() != null ? 1 : 0;
995      //the started scanner shouldn't see the rows put below
996      for (int i = 1; i < 1000; i++) {
997        put = new Put(Bytes.toBytes(String.valueOf(i)));
998        put.setDurability(Durability.ASYNC_WAL);
999        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
1000        table.put(put);
1001      }
1002      for (Result result : scanner) {
1003        rowNum++;
1004      }
1005      //scanner should only see two rows
1006      assertEquals(2, rowNum);
1007      scanner = table.getScanner(scan);
1008      rowNum = 0;
1009      for (Result result : scanner) {
1010        rowNum++;
1011      }
1012      // the new scanner should see all rows
1013      assertEquals(1001, rowNum);
1014    }
1015  }
1016
1017  @Test
1018  public void testPutThenGetWithMultipleThreads() throws Exception {
1019    final int THREAD_NUM = 20;
1020    final int ROUND_NUM = 10;
1021    for (int round = 0; round < ROUND_NUM; round++) {
1022      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1023      final AtomicInteger successCnt = new AtomicInteger(0);
1024      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1025        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1026
1027        for (int i = 0; i < THREAD_NUM; i++) {
1028          final int index = i;
1029          Thread t = new Thread(new Runnable() {
1030
1031            @Override
1032            public void run() {
1033              final byte[] row = Bytes.toBytes("row-" + index);
1034              final byte[] value = Bytes.toBytes("v" + index);
1035              try {
1036                Put put = new Put(row);
1037                put.addColumn(FAMILY, QUALIFIER, value);
1038                ht.put(put);
1039                Get get = new Get(row);
1040                Result result = ht.get(get);
1041                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1042                if (Bytes.equals(value, returnedValue)) {
1043                  successCnt.getAndIncrement();
1044                } else {
1045                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1046                          + ", returned value: "
1047                          + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1048                }
1049              } catch (Throwable e) {
1050                // do nothing
1051              }
1052            }
1053          });
1054          threads.add(t);
1055        }
1056        for (Thread t : threads) {
1057          t.start();
1058        }
1059        for (Thread t : threads) {
1060          t.join();
1061        }
1062        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1063      }
1064      TEST_UTIL.deleteTable(tableName);
1065    }
1066  }
1067
1068  private static void assertNoLocks(final TableName tableName)
1069          throws IOException, InterruptedException {
1070    HRegion region = (HRegion) find(tableName);
1071    assertEquals(0, region.getLockedRows().size());
1072  }
1073  private static HRegion find(final TableName tableName)
1074      throws IOException, InterruptedException {
1075    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1076    List<HRegion> regions = rs.getRegions(tableName);
1077    assertEquals(1, regions.size());
1078    return regions.get(0);
1079  }
1080
1081  private static <T extends RegionObserver> T find(final TableName tableName,
1082          Class<T> clz) throws IOException, InterruptedException {
1083    HRegion region = find(tableName);
1084    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1085    assertTrue("The cp instance should be " + clz.getName()
1086            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1087    return clz.cast(cp);
1088  }
1089
1090  public static class WaitingForMultiMutationsObserver
1091      implements RegionCoprocessor, RegionObserver {
1092    final CountDownLatch latch = new CountDownLatch(1);
1093
1094    @Override
1095    public Optional<RegionObserver> getRegionObserver() {
1096      return Optional.of(this);
1097    }
1098
1099    @Override
1100    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1101            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1102      try {
1103        latch.await();
1104      } catch (InterruptedException ex) {
1105        throw new IOException(ex);
1106      }
1107    }
1108  }
1109
1110  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1111    private final CountDownLatch latch = new CountDownLatch(1);
1112
1113    @Override
1114    public Optional<RegionObserver> getRegionObserver() {
1115      return Optional.of(this);
1116    }
1117
1118    @Override
1119    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1120            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1121      try {
1122        // waiting for scanner
1123        latch.await();
1124      } catch (InterruptedException ex) {
1125        throw new IOException(ex);
1126      }
1127    }
1128
1129    @Override
1130    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1131            final Scan scan, final RegionScanner s) throws IOException {
1132      latch.countDown();
1133      return s;
1134    }
1135  }
1136
1137  static byte[] generateHugeValue(int size) {
1138    Random rand = ThreadLocalRandom.current();
1139    byte[] value = new byte[size];
1140    for (int i = 0; i < value.length; i++) {
1141      value[i] = (byte) rand.nextInt(256);
1142    }
1143    return value;
1144  }
1145
1146  @Test
1147  public void testScanWithBatchSizeReturnIncompleteCells() throws IOException, InterruptedException {
1148    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1149            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1150            .build();
1151    try (Table table = TEST_UTIL.createTable(hd, null)) {
1152      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1153
1154      Put put = new Put(ROW);
1155      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1156      table.put(put);
1157
1158      put = new Put(ROW);
1159      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1160      table.put(put);
1161
1162      for (int i = 2; i < 5; i++) {
1163        for (int version = 0; version < 2; version++) {
1164          put = new Put(ROW);
1165          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1166          table.put(put);
1167        }
1168      }
1169
1170      Scan scan = new Scan();
1171      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1172              .setMaxResultSize(4 * 1024 * 1024);
1173      Result result;
1174      try (ResultScanner scanner = table.getScanner(scan)) {
1175        List<Result> list = new ArrayList<>();
1176        /*
1177         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
1178         * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
1179         * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1180         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1181         * into two result: 2+3 -> 3+2;
1182         */
1183        while ((result = scanner.next()) != null) {
1184          list.add(result);
1185        }
1186
1187        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1188        Assert.assertEquals(2, list.size());
1189        Assert.assertEquals(3, list.get(0).size());
1190        Assert.assertEquals(2, list.get(1).size());
1191      }
1192
1193      scan = new Scan();
1194      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1195              .setMaxResultSize(4 * 1024 * 1024);
1196      try (ResultScanner scanner = table.getScanner(scan)) {
1197        List<Result> list = new ArrayList<>();
1198        while ((result = scanner.next()) != null) {
1199          list.add(result);
1200        }
1201        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1202        Assert.assertEquals(3, list.size());
1203        Assert.assertEquals(2, list.get(0).size());
1204        Assert.assertEquals(2, list.get(1).size());
1205        Assert.assertEquals(1, list.get(2).size());
1206      }
1207    }
1208  }
1209}