001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Optional;
030import java.util.Random;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.Coprocessor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
049import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
050import org.apache.hadoop.hbase.ipc.ServerRpcController;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.HRegionServer;
053import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
054import org.apache.hadoop.hbase.regionserver.RegionScanner;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.junit.jupiter.api.TestTemplate;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
062
063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
066
067public class FromClientSideTest3 extends FromClientSideTestBase {
068
069  protected FromClientSideTest3(Class<? extends ConnectionRegistry> registryImpl,
070    int numHedgedReqs) {
071    super(registryImpl, numHedgedReqs);
072  }
073
074  private static final Logger LOG = LoggerFactory.getLogger(FromClientSideTest3.class);
075
076  private static int WAITTABLE_MILLIS;
077
078  protected static void startCluster(Class<?>... cps) throws Exception {
079    WAITTABLE_MILLIS = 10000;
080    SLAVES = 3;
081    initialize(cps);
082  }
083
084  private static List<Cell> toList(ResultScanner scanner) {
085    try {
086      List<Cell> cells = new ArrayList<>();
087      for (Result r : scanner) {
088        cells.addAll(r.listCells());
089      }
090      return cells;
091    } finally {
092      scanner.close();
093    }
094  }
095
096  @TestTemplate
097  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
098    TEST_UTIL.createTable(tableName, new byte[][] { FAMILY });
099    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
100    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
101      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
102      byte[] row = Bytes.toBytes("SpecifiedRow");
103      byte[] value0 = Bytes.toBytes("value_0");
104      byte[] value1 = Bytes.toBytes("value_1");
105      Put put = new Put(row);
106      put.addColumn(FAMILY, QUALIFIER, VALUE);
107      table.put(put);
108      Delete d = new Delete(row);
109      table.delete(d);
110      put = new Put(row);
111      put.addColumn(FAMILY, null, value0);
112      table.put(put);
113      put = new Put(row);
114      put.addColumn(FAMILY, null, value1);
115      table.put(put);
116      List<Cell> cells = toList(table.getScanner(new Scan()));
117      assertEquals(1, cells.size());
118      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
119
120      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
121      assertEquals(1, cells.size());
122      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
123
124      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
125      assertEquals(0, cells.size());
126
127      TEST_UTIL.getAdmin().flush(tableName);
128      cells = toList(table.getScanner(new Scan()));
129      assertEquals(1, cells.size());
130      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
131
132      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
133      assertEquals(1, cells.size());
134      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
135
136      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
137      assertEquals(0, cells.size());
138    }
139  }
140
141  @TestTemplate
142  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
143    TEST_UTIL.createTable(tableName, new byte[][] { FAMILY });
144    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
145    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
146      byte[] row = Bytes.toBytes("SpecifiedRow");
147      byte[] qual0 = Bytes.toBytes("qual0");
148      byte[] qual1 = Bytes.toBytes("qual1");
149      long now = EnvironmentEdgeManager.currentTime();
150      Delete d = new Delete(row, now);
151      table.delete(d);
152
153      Put put = new Put(row);
154      put.addColumn(FAMILY, null, now + 1, VALUE);
155      table.put(put);
156
157      put = new Put(row);
158      put.addColumn(FAMILY, qual1, now + 2, qual1);
159      table.put(put);
160
161      put = new Put(row);
162      put.addColumn(FAMILY, qual0, now + 3, qual0);
163      table.put(put);
164
165      Result r = table.get(new Get(row));
166      assertEquals(3, r.size(), r.toString());
167      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
168      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
169      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
170
171      TEST_UTIL.getAdmin().flush(tableName);
172      r = table.get(new Get(row));
173      assertEquals(3, r.size());
174      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
175      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
176      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
177    }
178  }
179
180  @TestTemplate
181  public void testPutWithPreBatchMutate() throws Exception {
182    testPreBatchMutate(tableName, () -> {
183      try (Connection conn = getConnection(); Table t = conn.getTable(tableName)) {
184        Put put = new Put(ROW);
185        put.addColumn(FAMILY, QUALIFIER, VALUE);
186        t.put(put);
187      } catch (IOException ex) {
188        throw new RuntimeException(ex);
189      }
190    });
191  }
192
193  @TestTemplate
194  public void testRowMutationsWithPreBatchMutate() throws Exception {
195    testPreBatchMutate(tableName, () -> {
196      try (Connection conn = getConnection(); Table t = conn.getTable(tableName)) {
197        RowMutations rm = new RowMutations(ROW, 1);
198        Put put = new Put(ROW);
199        put.addColumn(FAMILY, QUALIFIER, VALUE);
200        rm.add(put);
201        t.mutateRow(rm);
202      } catch (IOException ex) {
203        throw new RuntimeException(ex);
204      }
205    });
206  }
207
208  private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
209    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
210      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
211      .setCoprocessor(WaitingForScanObserver.class.getName()).build();
212    try (Connection conn = getConnection(); Admin admin = conn.getAdmin()) {
213      admin.createTable(tableDescriptor);
214      // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
215
216      ExecutorService service = Executors.newFixedThreadPool(2);
217      service.execute(rn);
218      final List<Cell> cells = new ArrayList<>();
219      service.execute(() -> {
220        try {
221          // waiting for update.
222          TimeUnit.SECONDS.sleep(3);
223          try (Table t = conn.getTable(tableName)) {
224            Scan scan = new Scan();
225            try (ResultScanner scanner = t.getScanner(scan)) {
226              for (Result r : scanner) {
227                cells.addAll(Arrays.asList(r.rawCells()));
228              }
229            }
230          }
231        } catch (IOException | InterruptedException ex) {
232          throw new RuntimeException(ex);
233        }
234      });
235      service.shutdown();
236      service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
237      assertEquals(0, cells.size(), "The write is blocking by RegionObserver#postBatchMutate"
238        + ", so the data is invisible to reader");
239    }
240  }
241
242  @TestTemplate
243  public void testLockLeakWithDelta() throws Exception, Throwable {
244    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
245      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
246      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
247      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
248    TEST_UTIL.getAdmin().createTable(tableDescriptor);
249    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
250
251    // new a connection for lower retry number.
252    Configuration copy = getClientConf();
253    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
254    try (Connection con = ConnectionFactory.createConnection(copy)) {
255      HRegion region = (HRegion) find(tableName);
256      region.setTimeoutForWriteLock(10);
257      ExecutorService putService = Executors.newSingleThreadExecutor();
258      putService.execute(() -> {
259        try (Table table = con.getTable(tableName)) {
260          Put put = new Put(ROW);
261          put.addColumn(FAMILY, QUALIFIER, VALUE);
262          // the put will be blocked by WaitingForMultiMutationsObserver.
263          table.put(put);
264        } catch (IOException ex) {
265          throw new RuntimeException(ex);
266        }
267      });
268      ExecutorService appendService = Executors.newSingleThreadExecutor();
269      appendService.execute(() -> {
270        Append append = new Append(ROW);
271        append.addColumn(FAMILY, QUALIFIER, VALUE);
272        try (Table table = con.getTable(tableName)) {
273          table.append(append);
274          fail("The APPEND should fail because the target lock is blocked by previous put");
275        } catch (Exception ex) {
276        }
277      });
278      appendService.shutdown();
279      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
280      WaitingForMultiMutationsObserver observer =
281        find(tableName, WaitingForMultiMutationsObserver.class);
282      observer.latch.countDown();
283      putService.shutdown();
284      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
285      try (Table table = con.getTable(tableName)) {
286        Result r = table.get(new Get(ROW));
287        assertFalse(r.isEmpty());
288        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
289      }
290    }
291    HRegion region = (HRegion) find(tableName);
292    int readLockCount = region.getReadLockCount();
293    LOG.info("readLockCount:" + readLockCount);
294    assertEquals(0, readLockCount);
295  }
296
297  @TestTemplate
298  public void testMultiRowMutations() throws Exception, Throwable {
299    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
300      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
301      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
302      .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
303      .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
304    TEST_UTIL.getAdmin().createTable(tableDescriptor);
305    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
306
307    // new a connection for lower retry number.
308    Configuration copy = getClientConf();
309    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
310    try (Connection con = ConnectionFactory.createConnection(copy)) {
311      byte[] row = Bytes.toBytes("ROW-0");
312      byte[] rowLocked = Bytes.toBytes("ROW-1");
313      byte[] value0 = Bytes.toBytes("VALUE-0");
314      byte[] value1 = Bytes.toBytes("VALUE-1");
315      byte[] value2 = Bytes.toBytes("VALUE-2");
316      assertNoLocks(tableName);
317      ExecutorService putService = Executors.newSingleThreadExecutor();
318      putService.execute(() -> {
319        try (Table table = con.getTable(tableName)) {
320          Put put0 = new Put(rowLocked);
321          put0.addColumn(FAMILY, QUALIFIER, value0);
322          // the put will be blocked by WaitingForMultiMutationsObserver.
323          table.put(put0);
324        } catch (IOException ex) {
325          throw new RuntimeException(ex);
326        }
327      });
328      ExecutorService cpService = Executors.newSingleThreadExecutor();
329      AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
330      cpService.execute(() -> {
331        Put put1 = new Put(row);
332        Put put2 = new Put(rowLocked);
333        put1.addColumn(FAMILY, QUALIFIER, value1);
334        put2.addColumn(FAMILY, QUALIFIER, value2);
335        try (Table table = con.getTable(tableName)) {
336          MultiRowMutationProtos.MutateRowsRequest request =
337            MultiRowMutationProtos.MutateRowsRequest.newBuilder()
338              .addMutationRequest(
339                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
340              .addMutationRequest(
341                ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
342              .build();
343          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
344            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
345              ServerRpcController controller = new ServerRpcController();
346              CoprocessorRpcUtils.BlockingRpcCallback<
347                MultiRowMutationProtos.MutateRowsResponse> rpcCallback =
348                  new CoprocessorRpcUtils.BlockingRpcCallback<>();
349              exe.mutateRows(controller, request, rpcCallback);
350              if (
351                controller.failedOnException()
352                  && !(controller.getFailedOn() instanceof UnknownProtocolException)
353              ) {
354                exceptionDuringMutateRows.set(true);
355              }
356              return rpcCallback.get();
357            });
358        } catch (Throwable ex) {
359          LOG.error("encountered " + ex);
360        }
361      });
362      cpService.shutdown();
363      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
364      WaitingForMultiMutationsObserver observer =
365        find(tableName, WaitingForMultiMutationsObserver.class);
366      observer.latch.countDown();
367      putService.shutdown();
368      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
369      try (Table table = con.getTable(tableName)) {
370        Get g0 = new Get(row);
371        Get g1 = new Get(rowLocked);
372        Result r0 = table.get(g0);
373        Result r1 = table.get(g1);
374        assertTrue(r0.isEmpty());
375        assertFalse(r1.isEmpty());
376        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
377      }
378      assertNoLocks(tableName);
379      if (!exceptionDuringMutateRows.get()) {
380        fail("This cp should fail because the target lock is blocked by previous put");
381      }
382    }
383  }
384
385  /**
386   * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is
387   * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that
388   * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make
389   * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch
390   * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0)
391   * are put into memstore, then a scanner with a smaller readpoint can see these data, which
392   * disobey the multi version concurrency control rules. This test case is to reproduce this
393   * scenario.
394   */
395  @TestTemplate
396  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
397    TEST_UTIL.createTable(tableName, new byte[][] { FAMILY });
398    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
399    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
400      // put two row first to init the scanner
401      Put put = new Put(Bytes.toBytes("0"));
402      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
403      table.put(put);
404      put = new Put(Bytes.toBytes("00"));
405      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
406      table.put(put);
407      Scan scan = new Scan();
408      scan.setTimeRange(0, Long.MAX_VALUE);
409      scan.setCaching(1);
410      try (ResultScanner scanner = table.getScanner(scan)) {
411        int rowNum = scanner.next() != null ? 1 : 0;
412        // the started scanner shouldn't see the rows put below
413        for (int i = 1; i < 1000; i++) {
414          put = new Put(Bytes.toBytes(String.valueOf(i)));
415          put.setDurability(Durability.ASYNC_WAL);
416          put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
417          table.put(put);
418        }
419        rowNum += Iterables.size(scanner);
420        // scanner should only see two rows
421        assertEquals(2, rowNum);
422      }
423      try (ResultScanner scanner = table.getScanner(scan)) {
424        int rowNum = Iterables.size(scanner);
425        // the new scanner should see all rows
426        assertEquals(1001, rowNum);
427      }
428    }
429  }
430
431  private static void assertNoLocks(final TableName tableName)
432    throws IOException, InterruptedException {
433    HRegion region = (HRegion) find(tableName);
434    assertEquals(0, region.getLockedRows().size());
435  }
436
437  private static HRegion find(final TableName tableName) throws IOException, InterruptedException {
438    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
439    List<HRegion> regions = rs.getRegions(tableName);
440    assertEquals(1, regions.size());
441    return regions.get(0);
442  }
443
444  private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz)
445    throws IOException, InterruptedException {
446    HRegion region = find(tableName);
447    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
448    assertTrue(clz.isInstance(cp), "The cp instance should be " + clz.getName()
449      + ", current instance is " + cp.getClass().getName());
450    return clz.cast(cp);
451  }
452
453  public static class WaitingForMultiMutationsObserver
454    implements RegionCoprocessor, RegionObserver {
455    final CountDownLatch latch = new CountDownLatch(1);
456
457    @Override
458    public Optional<RegionObserver> getRegionObserver() {
459      return Optional.of(this);
460    }
461
462    @Override
463    public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
464      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
465      try {
466        latch.await();
467      } catch (InterruptedException ex) {
468        throw new IOException(ex);
469      }
470    }
471  }
472
473  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
474    private final CountDownLatch latch = new CountDownLatch(1);
475
476    @Override
477    public Optional<RegionObserver> getRegionObserver() {
478      return Optional.of(this);
479    }
480
481    @Override
482    public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
483      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
484      try {
485        // waiting for scanner
486        latch.await();
487      } catch (InterruptedException ex) {
488        throw new IOException(ex);
489      }
490    }
491
492    @Override
493    public RegionScanner postScannerOpen(
494      final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan,
495      final RegionScanner s) throws IOException {
496      latch.countDown();
497      return s;
498    }
499  }
500
501  static byte[] generateHugeValue(int size) {
502    Random rand = ThreadLocalRandom.current();
503    byte[] value = new byte[size];
504    for (int i = 0; i < value.length; i++) {
505      value[i] = (byte) rand.nextInt(256);
506    }
507    return value;
508  }
509
510  @TestTemplate
511  public void testScanWithBatchSizeReturnIncompleteCells()
512    throws IOException, InterruptedException {
513    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
514      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
515      .build();
516    try (Table table = TEST_UTIL.createTable(hd, null)) {
517      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
518
519      Put put = new Put(ROW);
520      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
521      table.put(put);
522
523      put = new Put(ROW);
524      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
525      table.put(put);
526
527      for (int i = 2; i < 5; i++) {
528        for (int version = 0; version < 2; version++) {
529          put = new Put(ROW);
530          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
531          table.put(put);
532        }
533      }
534
535      Scan scan = new Scan();
536      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
537        .setMaxResultSize(4 * 1024 * 1024);
538      Result result;
539      try (ResultScanner scanner = table.getScanner(scan)) {
540        List<Result> list = new ArrayList<>();
541        /*
542         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The
543         * second scan rpc should return a result with 3 cells, because reach the batch limit = 3;
544         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
545         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
546         * into two result: 2+3 -> 3+2;
547         */
548        while ((result = scanner.next()) != null) {
549          list.add(result);
550        }
551
552        assertEquals(5, list.stream().mapToInt(Result::size).sum());
553        assertEquals(2, list.size());
554        assertEquals(3, list.get(0).size());
555        assertEquals(2, list.get(1).size());
556      }
557
558      scan = new Scan();
559      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
560        .setMaxResultSize(4 * 1024 * 1024);
561      try (ResultScanner scanner = table.getScanner(scan)) {
562        List<Result> list = new ArrayList<>();
563        while ((result = scanner.next()) != null) {
564          list.add(result);
565        }
566        assertEquals(5, list.stream().mapToInt(Result::size).sum());
567        assertEquals(3, list.size());
568        assertEquals(2, list.get(0).size());
569        assertEquals(2, list.get(1).size());
570        assertEquals(1, list.get(2).size());
571      }
572    }
573  }
574}