001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.Coprocessor;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.RegionMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
056import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
057import org.apache.hadoop.hbase.ipc.RpcClient;
058import org.apache.hadoop.hbase.ipc.RpcClientFactory;
059import org.apache.hadoop.hbase.ipc.ServerRpcController;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
063import org.apache.hadoop.hbase.regionserver.RegionScanner;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.junit.jupiter.api.AfterAll;
067import org.junit.jupiter.api.AfterEach;
068import org.junit.jupiter.api.BeforeEach;
069import org.junit.jupiter.api.Test;
070import org.junit.jupiter.api.TestInfo;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
077
078public class FromClientSide3TestBase {
079
080  private static final Logger LOG = LoggerFactory.getLogger(FromClientSide3TestBase.class);
081  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
082
083  private static int WAITTABLE_MILLIS;
084  private static byte[] FAMILY;
085  private static int SLAVES;
086  private static byte[] ROW;
087  private static byte[] ANOTHERROW;
088  private static byte[] QUALIFIER;
089  private static byte[] VALUE;
090  private static byte[] COL_QUAL;
091  private static byte[] VAL_BYTES;
092  private static byte[] ROW_BYTES;
093
094  private TableName tableName;
095
096  protected static void startCluster() throws Exception {
097    WAITTABLE_MILLIS = 10000;
098    FAMILY = Bytes.toBytes("testFamily");
099    SLAVES = 3;
100    ROW = Bytes.toBytes("testRow");
101    ANOTHERROW = Bytes.toBytes("anotherrow");
102    QUALIFIER = Bytes.toBytes("testQualifier");
103    VALUE = Bytes.toBytes("testValue");
104    COL_QUAL = Bytes.toBytes("f1");
105    VAL_BYTES = Bytes.toBytes("v1");
106    ROW_BYTES = Bytes.toBytes("r1");
107    TEST_UTIL.startMiniCluster(SLAVES);
108  }
109
110  @AfterAll
111  public static void shutdownCluster() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  @BeforeEach
116  public void setUp(TestInfo testInfo) throws Exception {
117    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
118  }
119
120  @AfterEach
121  public void tearDown() throws Exception {
122    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
123      LOG.info("Tear down, remove table=" + htd.getTableName());
124      TEST_UTIL.deleteTable(htd.getTableName());
125    }
126  }
127
128  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception {
129    Put put = new Put(row);
130    Random rand = ThreadLocalRandom.current();
131    for (int i = 0; i < nPuts; i++) {
132      byte[] qualifier = Bytes.toBytes(rand.nextInt());
133      byte[] value = Bytes.toBytes(rand.nextInt());
134      put.addColumn(family, qualifier, value);
135    }
136    table.put(put);
137  }
138
139  private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family,
140    int nFlushes, int nPuts) throws Exception {
141    for (int i = 0; i < nFlushes; i++) {
142      randomCFPuts(table, row, family, nPuts);
143      admin.flush(table.getName());
144    }
145  }
146
147  private static List<Cell> toList(ResultScanner scanner) {
148    try {
149      List<Cell> cells = new ArrayList<>();
150      for (Result r : scanner) {
151        cells.addAll(r.listCells());
152      }
153      return cells;
154    } finally {
155      scanner.close();
156    }
157  }
158
159  @Test
160  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
161    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
162      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
163      byte[] row = Bytes.toBytes("SpecifiedRow");
164      byte[] value0 = Bytes.toBytes("value_0");
165      byte[] value1 = Bytes.toBytes("value_1");
166      Put put = new Put(row);
167      put.addColumn(FAMILY, QUALIFIER, VALUE);
168      table.put(put);
169      Delete d = new Delete(row);
170      table.delete(d);
171      put = new Put(row);
172      put.addColumn(FAMILY, null, value0);
173      table.put(put);
174      put = new Put(row);
175      put.addColumn(FAMILY, null, value1);
176      table.put(put);
177      List<Cell> cells = toList(table.getScanner(new Scan()));
178      assertEquals(1, cells.size());
179      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
180
181      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
182      assertEquals(1, cells.size());
183      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
184
185      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
186      assertEquals(0, cells.size());
187
188      TEST_UTIL.getAdmin().flush(tableName);
189      cells = toList(table.getScanner(new Scan()));
190      assertEquals(1, cells.size());
191      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
192
193      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
194      assertEquals(1, cells.size());
195      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
196
197      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
198      assertEquals(0, cells.size());
199    }
200  }
201
202  @Test
203  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
204    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
205      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
206      byte[] row = Bytes.toBytes("SpecifiedRow");
207      byte[] qual0 = Bytes.toBytes("qual0");
208      byte[] qual1 = Bytes.toBytes("qual1");
209      long now = EnvironmentEdgeManager.currentTime();
210      Delete d = new Delete(row, now);
211      table.delete(d);
212
213      Put put = new Put(row);
214      put.addColumn(FAMILY, null, now + 1, VALUE);
215      table.put(put);
216
217      put = new Put(row);
218      put.addColumn(FAMILY, qual1, now + 2, qual1);
219      table.put(put);
220
221      put = new Put(row);
222      put.addColumn(FAMILY, qual0, now + 3, qual0);
223      table.put(put);
224
225      Result r = table.get(new Get(row));
226      assertEquals(3, r.size(), r.toString());
227      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
228      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
229      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
230
231      TEST_UTIL.getAdmin().flush(tableName);
232      r = table.get(new Get(row));
233      assertEquals(3, r.size());
234      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
235      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
236      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
237    }
238  }
239
240  private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region)
241    throws IOException {
242    for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) {
243      if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) {
244        return metrics.getStoreFileCount();
245      }
246    }
247    return 0;
248  }
249
250  // override the config settings at the CF level and ensure priority
251  @Test
252  public void testAdvancedConfigOverride() throws Exception {
253    /*
254     * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3,
255     * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If
256     * we use the HTD value instead of the default config value, adding 3 files and issuing a
257     * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify
258     * table. The CF schema should override the Table schema and now cause a minor compaction.
259     */
260    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
261
262    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
263      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
264      Admin admin = TEST_UTIL.getAdmin();
265
266      // Create 3 store files.
267      byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt());
268      performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
269
270      try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
271        // Verify we have multiple store files.
272        HRegionLocation loc = locator.getRegionLocation(row, true);
273        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);
274
275        // Issue a compaction request
276        admin.compact(tableName);
277
278        // poll wait for the compactions to happen
279        for (int i = 0; i < 10 * 1000 / 40; ++i) {
280          // The number of store files after compaction should be lesser.
281          loc = locator.getRegionLocation(row, true);
282          if (!loc.getRegion().isOffline()) {
283            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) {
284              break;
285            }
286          }
287          Thread.sleep(40);
288        }
289        // verify the compactions took place and that we didn't just time out
290        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1);
291
292        // change the compaction.min config option for this table to 5
293        LOG.info("hbase.hstore.compaction.min should now be 5");
294        TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor())
295          .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build();
296        admin.modifyTable(htd);
297        LOG.info("alter status finished");
298
299        // Create 3 more store files.
300        performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10);
301
302        // Issue a compaction request
303        admin.compact(tableName);
304
305        // This time, the compaction request should not happen
306        Thread.sleep(10 * 1000);
307        loc = locator.getRegionLocation(row, true);
308        int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion());
309        assertTrue(sfCount > 1);
310
311        // change an individual CF's config option to 2 & online schema update
312        LOG.info("hbase.hstore.compaction.min should now be 2");
313        htd = TableDescriptorBuilder.newBuilder(htd)
314          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
315            .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build())
316          .build();
317        admin.modifyTable(htd);
318        LOG.info("alter status finished");
319
320        // Issue a compaction request
321        admin.compact(tableName);
322
323        // poll wait for the compactions to happen
324        for (int i = 0; i < 10 * 1000 / 40; ++i) {
325          loc = locator.getRegionLocation(row, true);
326          try {
327            if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) {
328              break;
329            }
330          } catch (Exception e) {
331            LOG.debug("Waiting for region to come online: "
332              + Bytes.toStringBinary(loc.getRegion().getRegionName()));
333          }
334          Thread.sleep(40);
335        }
336
337        // verify the compaction took place and that we didn't just time out
338        assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount);
339
340        // Finally, ensure that we can remove a custom config value after we made it
341        LOG.info("Removing CF config value");
342        LOG.info("hbase.hstore.compaction.min should now be 5");
343        htd = TableDescriptorBuilder.newBuilder(htd)
344          .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
345            .setValue("hbase.hstore.compaction.min", null).build())
346          .build();
347        admin.modifyTable(htd);
348        LOG.info("alter status finished");
349        assertNull(table.getDescriptor().getColumnFamily(FAMILY)
350          .getValue(Bytes.toBytes("hbase.hstore.compaction.min")));
351      }
352    }
353  }
354
355  @Test
356  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
357    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
358      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
359      List<Put> actions = new ArrayList<>();
360      Object[] results = new Object[2];
361      // create an empty Put
362      Put put1 = new Put(ROW);
363      actions.add(put1);
364
365      Put put2 = new Put(ANOTHERROW);
366      put2.addColumn(FAMILY, QUALIFIER, VALUE);
367      actions.add(put2);
368
369      table.batch(actions, results);
370      fail("Empty Put should have failed the batch call");
371    } catch (IllegalArgumentException iae) {
372    }
373  }
374
375  // Test Table.batch with large amount of mutations against the same key.
376  // It used to trigger read lock's "Maximum lock count exceeded" Error.
377  @Test
378  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
379    int sixtyFourK = 64 * 1024;
380    List<Put> actions = new ArrayList<>();
381    Object[] results = new Object[(sixtyFourK + 1) * 2];
382
383    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
384      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
385
386      for (int i = 0; i < sixtyFourK + 1; i++) {
387        Put put1 = new Put(ROW);
388        put1.addColumn(FAMILY, QUALIFIER, VALUE);
389        actions.add(put1);
390
391        Put put2 = new Put(ANOTHERROW);
392        put2.addColumn(FAMILY, QUALIFIER, VALUE);
393        actions.add(put2);
394      }
395
396      table.batch(actions, results);
397    }
398  }
399
400  @Test
401  public void testBatchWithRowMutation() throws Exception {
402    LOG.info("Starting testBatchWithRowMutation");
403    byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
404
405    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
406      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
407
408      RowMutations arm = RowMutations
409        .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
410      Object[] batchResult = new Object[1];
411      table.batch(Arrays.asList(arm), batchResult);
412
413      Get g = new Get(ROW);
414      Result r = table.get(g);
415      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
416
417      arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
418        new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
419      table.batch(Arrays.asList(arm), batchResult);
420      r = table.get(g);
421      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
422      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
423
424      // Test that we get the correct remote exception for RowMutations from batch()
425      try {
426        arm = RowMutations.of(Collections.singletonList(
427          new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE)));
428        table.batch(Arrays.asList(arm), batchResult);
429        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
430      } catch (RetriesExhaustedException e) {
431        String msg = e.getMessage();
432        assertTrue(msg.contains("NoSuchColumnFamilyException"));
433      }
434    }
435  }
436
437  @Test
438  public void testBatchWithCheckAndMutate() throws Exception {
439    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
440      byte[] row1 = Bytes.toBytes("row1");
441      byte[] row2 = Bytes.toBytes("row2");
442      byte[] row3 = Bytes.toBytes("row3");
443      byte[] row4 = Bytes.toBytes("row4");
444      byte[] row5 = Bytes.toBytes("row5");
445      byte[] row6 = Bytes.toBytes("row6");
446      byte[] row7 = Bytes.toBytes("row7");
447
448      table
449        .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
450          new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
451          new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
452          new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
453          new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
454          new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
455          new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
456
457      CheckAndMutate checkAndMutate1 =
458        CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
459          .build(new RowMutations(row1)
460            .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
461            .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
462            .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
463            .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
464      Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
465      RowMutations mutations =
466        new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
467          .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
468          .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
469          .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
470      CheckAndMutate checkAndMutate2 =
471        CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
472          .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
473      Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
474      CheckAndMutate checkAndMutate3 =
475        CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
476          .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
477      CheckAndMutate checkAndMutate4 =
478        CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
479          .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
480
481      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
482        checkAndMutate3, checkAndMutate4);
483      Object[] results = new Object[actions.size()];
484      table.batch(actions, results);
485
486      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
487      assertTrue(checkAndMutateResult.isSuccess());
488      assertEquals(3L,
489        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
490      assertEquals("d",
491        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
492
493      assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
494
495      Result result = (Result) results[2];
496      assertTrue(result.getExists());
497      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
498      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
499
500      checkAndMutateResult = (CheckAndMutateResult) results[3];
501      assertFalse(checkAndMutateResult.isSuccess());
502      assertNull(checkAndMutateResult.getResult());
503
504      assertTrue(((Result) results[4]).isEmpty());
505
506      checkAndMutateResult = (CheckAndMutateResult) results[5];
507      assertTrue(checkAndMutateResult.isSuccess());
508      assertEquals(11,
509        Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F"))));
510
511      checkAndMutateResult = (CheckAndMutateResult) results[6];
512      assertTrue(checkAndMutateResult.isSuccess());
513      assertEquals("gg",
514        Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G"))));
515
516      result = table.get(new Get(row1));
517      assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
518      assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
519      assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
520      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
521
522      result = table.get(new Get(row3));
523      assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
524      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
525      assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
526      assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
527      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
528
529      result = table.get(new Get(row4));
530      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
531
532      result = table.get(new Get(row5));
533      assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
534
535      result = table.get(new Get(row6));
536      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
537
538      result = table.get(new Get(row7));
539      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
540    }
541  }
542
543  @Test
544  public void testHTableExistsMethodSingleRegionSingleGet()
545    throws IOException, InterruptedException {
546    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
547      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
548
549      // Test with a single region table.
550      Put put = new Put(ROW);
551      put.addColumn(FAMILY, QUALIFIER, VALUE);
552
553      Get get = new Get(ROW);
554
555      boolean exist = table.exists(get);
556      assertFalse(exist);
557
558      table.put(put);
559
560      exist = table.exists(get);
561      assertTrue(exist);
562    }
563  }
564
565  @Test
566  public void testHTableExistsMethodSingleRegionMultipleGets()
567    throws IOException, InterruptedException {
568    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
569      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
570
571      Put put = new Put(ROW);
572      put.addColumn(FAMILY, QUALIFIER, VALUE);
573      table.put(put);
574
575      List<Get> gets = new ArrayList<>();
576      gets.add(new Get(ROW));
577      gets.add(new Get(ANOTHERROW));
578
579      boolean[] results = table.exists(gets);
580      assertTrue(results[0]);
581      assertFalse(results[1]);
582    }
583  }
584
585  @Test
586  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
587    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
588      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
589
590      Put put = new Put(ROW);
591      put.addColumn(FAMILY, QUALIFIER, VALUE);
592      table.put(put);
593
594      Get get = new Get(ROW);
595
596      boolean exist = table.exists(get);
597      assertEquals(true, exist);
598
599      Result result = table.get(get);
600      assertEquals(false, result.isEmpty());
601      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
602    }
603  }
604
605  @Test
606  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
607    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
608      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
609
610      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
611      Put put = new Put(ROW);
612      put.addColumn(FAMILY, QUALIFIER, VALUE);
613      table.put(put);
614      put = new Put(ROW2);
615      put.addColumn(FAMILY, QUALIFIER, VALUE);
616      table.put(put);
617
618      Get get = new Get(ROW);
619      Get get2 = new Get(ROW2);
620      ArrayList<Get> getList = new ArrayList<>(2);
621      getList.add(get);
622      getList.add(get2);
623
624      boolean[] exists = table.exists(getList);
625      assertEquals(true, exists[0]);
626      assertEquals(true, exists[1]);
627
628      Result[] result = table.get(getList);
629      assertEquals(false, result[0].isEmpty());
630      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
631      assertEquals(false, result[1].isEmpty());
632      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
633    }
634  }
635
636  @Test
637  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
638    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
639      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
640      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
641
642      Put put = new Put(ROW);
643      put.addColumn(FAMILY, QUALIFIER, VALUE);
644
645      Get get = new Get(ROW);
646
647      boolean exist = table.exists(get);
648      assertFalse(exist);
649
650      table.put(put);
651
652      exist = table.exists(get);
653      assertTrue(exist);
654    }
655  }
656
657  @Test
658  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
659    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
660      new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
661      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
662
663      Put put = new Put(ROW);
664      put.addColumn(FAMILY, QUALIFIER, VALUE);
665      table.put(put);
666
667      List<Get> gets = new ArrayList<>();
668      gets.add(new Get(ANOTHERROW));
669      gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 })));
670      gets.add(new Get(ROW));
671      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
672
673      LOG.info("Calling exists");
674      boolean[] results = table.exists(gets);
675      assertFalse(results[0]);
676      assertFalse(results[1]);
677      assertTrue(results[2]);
678      assertFalse(results[3]);
679
680      // Test with the first region.
681      put = new Put(new byte[] { 0x00 });
682      put.addColumn(FAMILY, QUALIFIER, VALUE);
683      table.put(put);
684
685      gets = new ArrayList<>();
686      gets.add(new Get(new byte[] { 0x00 }));
687      gets.add(new Get(new byte[] { 0x00, 0x00 }));
688      results = table.exists(gets);
689      assertTrue(results[0]);
690      assertFalse(results[1]);
691
692      // Test with the last region
693      put = new Put(new byte[] { (byte) 0xff, (byte) 0xff });
694      put.addColumn(FAMILY, QUALIFIER, VALUE);
695      table.put(put);
696
697      gets = new ArrayList<>();
698      gets.add(new Get(new byte[] { (byte) 0xff }));
699      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff }));
700      gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff }));
701      results = table.exists(gets);
702      assertFalse(results[0]);
703      assertTrue(results[1]);
704      assertFalse(results[2]);
705    }
706  }
707
708  @Test
709  public void testGetEmptyRow() throws Exception {
710    // Create a table and put in 1 row
711    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
712      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
713
714      Put put = new Put(ROW_BYTES);
715      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
716      table.put(put);
717
718      // Try getting the row with an empty row key
719      Result res = null;
720      try {
721        res = table.get(new Get(new byte[0]));
722        fail();
723      } catch (IllegalArgumentException e) {
724        // Expected.
725      }
726      assertTrue(res == null);
727      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
728      assertTrue(res.isEmpty() == true);
729      res = table.get(new Get(ROW_BYTES));
730      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
731    }
732  }
733
734  @Test
735  public void testConnectionDefaultUsesCodec() throws Exception {
736    try (
737      RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) {
738      assertTrue(client.hasCellBlockSupport());
739    }
740  }
741
742  @Test
743  public void testPutWithPreBatchMutate() throws Exception {
744    testPreBatchMutate(tableName, () -> {
745      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
746        Put put = new Put(ROW);
747        put.addColumn(FAMILY, QUALIFIER, VALUE);
748        t.put(put);
749      } catch (IOException ex) {
750        throw new RuntimeException(ex);
751      }
752    });
753  }
754
755  @Test
756  public void testRowMutationsWithPreBatchMutate() throws Exception {
757    testPreBatchMutate(tableName, () -> {
758      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
759        RowMutations rm = new RowMutations(ROW, 1);
760        Put put = new Put(ROW);
761        put.addColumn(FAMILY, QUALIFIER, VALUE);
762        rm.add(put);
763        t.mutateRow(rm);
764      } catch (IOException ex) {
765        throw new RuntimeException(ex);
766      }
767    });
768  }
769
770  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
771    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
772      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
773      .setCoprocessor(WaitingForScanObserver.class.getName()).build();
774    TEST_UTIL.getAdmin().createTable(tableDescriptor);
775    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
776
777    ExecutorService service = Executors.newFixedThreadPool(2);
778    service.execute(rn);
779    final List<Cell> cells = new ArrayList<>();
780    service.execute(() -> {
781      try {
782        // waiting for update.
783        TimeUnit.SECONDS.sleep(3);
784        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
785          Scan scan = new Scan();
786          try (ResultScanner scanner = t.getScanner(scan)) {
787            for (Result r : scanner) {
788              cells.addAll(Arrays.asList(r.rawCells()));
789            }
790          }
791        }
792      } catch (IOException | InterruptedException ex) {
793        throw new RuntimeException(ex);
794      }
795    });
796    service.shutdown();
797    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
798    assertEquals(0, cells.size(), "The write is blocking by RegionObserver#postBatchMutate"
799      + ", so the data is invisible to reader");
800    TEST_UTIL.deleteTable(tableName);
801  }
802
803  @Test
804  public void testLockLeakWithDelta() throws Exception, Throwable {
805    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
806      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
807      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
808      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
809    TEST_UTIL.getAdmin().createTable(tableDescriptor);
810    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
811
812    // new a connection for lower retry number.
813    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
814    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
815    try (Connection con = ConnectionFactory.createConnection(copy)) {
816      HRegion region = (HRegion) find(tableName);
817      region.setTimeoutForWriteLock(10);
818      ExecutorService putService = Executors.newSingleThreadExecutor();
819      putService.execute(() -> {
820        try (Table table = con.getTable(tableName)) {
821          Put put = new Put(ROW);
822          put.addColumn(FAMILY, QUALIFIER, VALUE);
823          // the put will be blocked by WaitingForMultiMutationsObserver.
824          table.put(put);
825        } catch (IOException ex) {
826          throw new RuntimeException(ex);
827        }
828      });
829      ExecutorService appendService = Executors.newSingleThreadExecutor();
830      appendService.execute(() -> {
831        Append append = new Append(ROW);
832        append.addColumn(FAMILY, QUALIFIER, VALUE);
833        try (Table table = con.getTable(tableName)) {
834          table.append(append);
835          fail("The APPEND should fail because the target lock is blocked by previous put");
836        } catch (Exception ex) {
837        }
838      });
839      appendService.shutdown();
840      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
841      WaitingForMultiMutationsObserver observer =
842        find(tableName, WaitingForMultiMutationsObserver.class);
843      observer.latch.countDown();
844      putService.shutdown();
845      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
846      try (Table table = con.getTable(tableName)) {
847        Result r = table.get(new Get(ROW));
848        assertFalse(r.isEmpty());
849        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
850      }
851    }
852    HRegion region = (HRegion) find(tableName);
853    int readLockCount = region.getReadLockCount();
854    LOG.info("readLockCount:" + readLockCount);
855    assertEquals(0, readLockCount);
856  }
857
858  @Test
859  public void testMultiRowMutations() throws Exception, Throwable {
860    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
861      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
862      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
863      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
864      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
865    TEST_UTIL.getAdmin().createTable(tableDescriptor);
866    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
867
868    // new a connection for lower retry number.
869    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
870    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
871    try (Connection con = ConnectionFactory.createConnection(copy)) {
872      byte[] row = Bytes.toBytes("ROW-0");
873      byte[] rowLocked = Bytes.toBytes("ROW-1");
874      byte[] value0 = Bytes.toBytes("VALUE-0");
875      byte[] value1 = Bytes.toBytes("VALUE-1");
876      byte[] value2 = Bytes.toBytes("VALUE-2");
877      assertNoLocks(tableName);
878      ExecutorService putService = Executors.newSingleThreadExecutor();
879      putService.execute(() -> {
880        try (Table table = con.getTable(tableName)) {
881          Put put0 = new Put(rowLocked);
882          put0.addColumn(FAMILY, QUALIFIER, value0);
883          // the put will be blocked by WaitingForMultiMutationsObserver.
884          table.put(put0);
885        } catch (IOException ex) {
886          throw new RuntimeException(ex);
887        }
888      });
889      ExecutorService cpService = Executors.newSingleThreadExecutor();
890      AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
891      cpService.execute(() -> {
892        Put put1 = new Put(row);
893        Put put2 = new Put(rowLocked);
894        put1.addColumn(FAMILY, QUALIFIER, value1);
895        put2.addColumn(FAMILY, QUALIFIER, value2);
896        try (Table table = con.getTable(tableName)) {
897          MultiRowMutationProtos.MutateRowsRequest request =
898            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
899              .addMutationRequest(
900                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
901              .addMutationRequest(
902                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
903              .build();
904          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
905            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
906              ServerRpcController controller = new ServerRpcController();
907              CoprocessorRpcUtils.BlockingRpcCallback<
908                MultiRowMutationProtos.MutateRowsResponse> rpcCallback =
909                  new CoprocessorRpcUtils.BlockingRpcCallback<>();
910              exe.mutateRows(controller, request, rpcCallback);
911              if (
912                controller.failedOnException()
913                  && !(controller.getFailedOn() instanceof UnknownProtocolException)
914              ) {
915                exceptionDuringMutateRows.set(true);
916              }
917              return rpcCallback.get();
918            });
919        } catch (Throwable ex) {
920          LOG.error("encountered " + ex);
921        }
922      });
923      cpService.shutdown();
924      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
925      WaitingForMultiMutationsObserver observer =
926        find(tableName, WaitingForMultiMutationsObserver.class);
927      observer.latch.countDown();
928      putService.shutdown();
929      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
930      try (Table table = con.getTable(tableName)) {
931        Get g0 = new Get(row);
932        Get g1 = new Get(rowLocked);
933        Result r0 = table.get(g0);
934        Result r1 = table.get(g1);
935        assertTrue(r0.isEmpty());
936        assertFalse(r1.isEmpty());
937        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
938      }
939      assertNoLocks(tableName);
940      if (!exceptionDuringMutateRows.get()) {
941        fail("This cp should fail because the target lock is blocked by previous put");
942      }
943    }
944  }
945
946  /**
947   * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is
948   * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that
949   * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make
950   * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch
951   * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0)
952   * are put into memstore, then a scanner with a smaller readpoint can see these data, which
953   * disobey the multi version concurrency control rules. This test case is to reproduce this
954   * scenario.
955   */
956  @Test
957  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
958    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
959      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
960      // put two row first to init the scanner
961      Put put = new Put(Bytes.toBytes("0"));
962      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
963      table.put(put);
964      put = new Put(Bytes.toBytes("00"));
965      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
966      table.put(put);
967      Scan scan = new Scan();
968      scan.setTimeRange(0, Long.MAX_VALUE);
969      scan.setCaching(1);
970      ResultScanner scanner = table.getScanner(scan);
971      int rowNum = scanner.next() != null ? 1 : 0;
972      // the started scanner shouldn't see the rows put below
973      for (int i = 1; i < 1000; i++) {
974        put = new Put(Bytes.toBytes(String.valueOf(i)));
975        put.setDurability(Durability.ASYNC_WAL);
976        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
977        table.put(put);
978      }
979      for (Result result : scanner) {
980        rowNum++;
981      }
982      // scanner should only see two rows
983      assertEquals(2, rowNum);
984      scanner = table.getScanner(scan);
985      rowNum = 0;
986      for (Result result : scanner) {
987        rowNum++;
988      }
989      // the new scanner should see all rows
990      assertEquals(1001, rowNum);
991    }
992  }
993
994  @Test
995  public void testPutThenGetWithMultipleThreads() throws Exception {
996    final int THREAD_NUM = 20;
997    final int ROUND_NUM = 10;
998    for (int round = 0; round < ROUND_NUM; round++) {
999      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1000      final AtomicInteger successCnt = new AtomicInteger(0);
1001      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1002        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1003
1004        for (int i = 0; i < THREAD_NUM; i++) {
1005          final int index = i;
1006          Thread t = new Thread(new Runnable() {
1007
1008            @Override
1009            public void run() {
1010              final byte[] row = Bytes.toBytes("row-" + index);
1011              final byte[] value = Bytes.toBytes("v" + index);
1012              try {
1013                Put put = new Put(row);
1014                put.addColumn(FAMILY, QUALIFIER, value);
1015                ht.put(put);
1016                Get get = new Get(row);
1017                Result result = ht.get(get);
1018                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1019                if (Bytes.equals(value, returnedValue)) {
1020                  successCnt.getAndIncrement();
1021                } else {
1022                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1023                    + ", returned value: "
1024                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1025                }
1026              } catch (Throwable e) {
1027                // do nothing
1028              }
1029            }
1030          });
1031          threads.add(t);
1032        }
1033        for (Thread t : threads) {
1034          t.start();
1035        }
1036        for (Thread t : threads) {
1037          t.join();
1038        }
1039        assertEquals(THREAD_NUM, successCnt.get(), "Not equal in round " + round);
1040      }
1041      TEST_UTIL.deleteTable(tableName);
1042    }
1043  }
1044
1045  private static void assertNoLocks(final TableName tableName)
1046    throws IOException, InterruptedException {
1047    HRegion region = (HRegion) find(tableName);
1048    assertEquals(0, region.getLockedRows().size());
1049  }
1050
1051  private static HRegion find(final TableName tableName) throws IOException, InterruptedException {
1052    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1053    List<HRegion> regions = rs.getRegions(tableName);
1054    assertEquals(1, regions.size());
1055    return regions.get(0);
1056  }
1057
1058  private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz)
1059    throws IOException, InterruptedException {
1060    HRegion region = find(tableName);
1061    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1062    assertTrue(clz.isInstance(cp), "The cp instance should be " + clz.getName()
1063      + ", current instance is " + cp.getClass().getName());
1064    return clz.cast(cp);
1065  }
1066
1067  public static class WaitingForMultiMutationsObserver
1068    implements RegionCoprocessor, RegionObserver {
1069    final CountDownLatch latch = new CountDownLatch(1);
1070
1071    @Override
1072    public Optional<RegionObserver> getRegionObserver() {
1073      return Optional.of(this);
1074    }
1075
1076    @Override
1077    public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1078      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1079      try {
1080        latch.await();
1081      } catch (InterruptedException ex) {
1082        throw new IOException(ex);
1083      }
1084    }
1085  }
1086
1087  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1088    private final CountDownLatch latch = new CountDownLatch(1);
1089
1090    @Override
1091    public Optional<RegionObserver> getRegionObserver() {
1092      return Optional.of(this);
1093    }
1094
1095    @Override
1096    public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1097      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1098      try {
1099        // waiting for scanner
1100        latch.await();
1101      } catch (InterruptedException ex) {
1102        throw new IOException(ex);
1103      }
1104    }
1105
1106    @Override
1107    public RegionScanner postScannerOpen(
1108      final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan,
1109      final RegionScanner s) throws IOException {
1110      latch.countDown();
1111      return s;
1112    }
1113  }
1114
1115  static byte[] generateHugeValue(int size) {
1116    Random rand = ThreadLocalRandom.current();
1117    byte[] value = new byte[size];
1118    for (int i = 0; i < value.length; i++) {
1119      value[i] = (byte) rand.nextInt(256);
1120    }
1121    return value;
1122  }
1123
1124  @Test
1125  public void testScanWithBatchSizeReturnIncompleteCells()
1126    throws IOException, InterruptedException {
1127    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1128      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1129      .build();
1130    try (Table table = TEST_UTIL.createTable(hd, null)) {
1131      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1132
1133      Put put = new Put(ROW);
1134      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1135      table.put(put);
1136
1137      put = new Put(ROW);
1138      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1139      table.put(put);
1140
1141      for (int i = 2; i < 5; i++) {
1142        for (int version = 0; version < 2; version++) {
1143          put = new Put(ROW);
1144          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1145          table.put(put);
1146        }
1147      }
1148
1149      Scan scan = new Scan();
1150      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1151        .setMaxResultSize(4 * 1024 * 1024);
1152      Result result;
1153      try (ResultScanner scanner = table.getScanner(scan)) {
1154        List<Result> list = new ArrayList<>();
1155        /*
1156         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The
1157         * second scan rpc should return a result with 3 cells, because reach the batch limit = 3;
1158         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1159         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1160         * into two result: 2+3 -> 3+2;
1161         */
1162        while ((result = scanner.next()) != null) {
1163          list.add(result);
1164        }
1165
1166        assertEquals(5, list.stream().mapToInt(Result::size).sum());
1167        assertEquals(2, list.size());
1168        assertEquals(3, list.get(0).size());
1169        assertEquals(2, list.get(1).size());
1170      }
1171
1172      scan = new Scan();
1173      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1174        .setMaxResultSize(4 * 1024 * 1024);
1175      try (ResultScanner scanner = table.getScanner(scan)) {
1176        List<Result> list = new ArrayList<>();
1177        while ((result = scanner.next()) != null) {
1178          list.add(result);
1179        }
1180        assertEquals(5, list.stream().mapToInt(Result::size).sum());
1181        assertEquals(3, list.size());
1182        assertEquals(2, list.get(0).size());
1183        assertEquals(2, list.get(1).size());
1184        assertEquals(1, list.get(2).size());
1185      }
1186    }
1187  }
1188}