001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Optional; 030import java.util.Random; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.Coprocessor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 049import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 050import org.apache.hadoop.hbase.ipc.ServerRpcController; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.HRegionServer; 053import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 054import org.apache.hadoop.hbase.regionserver.RegionScanner; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.junit.jupiter.api.TestTemplate; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 062 063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; 066 067public class FromClientSideTest3 extends FromClientSideTestBase { 068 069 protected FromClientSideTest3(Class<? extends ConnectionRegistry> registryImpl, 070 int numHedgedReqs) { 071 super(registryImpl, numHedgedReqs); 072 } 073 074 private static final Logger LOG = LoggerFactory.getLogger(FromClientSideTest3.class); 075 076 private static int WAITTABLE_MILLIS; 077 078 protected static void startCluster(Class<?>... cps) throws Exception { 079 WAITTABLE_MILLIS = 10000; 080 SLAVES = 3; 081 initialize(cps); 082 } 083 084 private static List<Cell> toList(ResultScanner scanner) { 085 try { 086 List<Cell> cells = new ArrayList<>(); 087 for (Result r : scanner) { 088 cells.addAll(r.listCells()); 089 } 090 return cells; 091 } finally { 092 scanner.close(); 093 } 094 } 095 096 @TestTemplate 097 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 098 TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }); 099 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 100 try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) { 101 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 102 byte[] row = Bytes.toBytes("SpecifiedRow"); 103 byte[] value0 = Bytes.toBytes("value_0"); 104 byte[] value1 = Bytes.toBytes("value_1"); 105 Put put = new Put(row); 106 put.addColumn(FAMILY, QUALIFIER, VALUE); 107 table.put(put); 108 Delete d = new Delete(row); 109 table.delete(d); 110 put = new Put(row); 111 put.addColumn(FAMILY, null, value0); 112 table.put(put); 113 put = new Put(row); 114 put.addColumn(FAMILY, null, value1); 115 table.put(put); 116 List<Cell> cells = toList(table.getScanner(new Scan())); 117 assertEquals(1, cells.size()); 118 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 119 120 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 121 assertEquals(1, cells.size()); 122 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 123 124 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 125 assertEquals(0, cells.size()); 126 127 TEST_UTIL.getAdmin().flush(tableName); 128 cells = toList(table.getScanner(new Scan())); 129 assertEquals(1, cells.size()); 130 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 131 132 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 133 assertEquals(1, cells.size()); 134 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 135 136 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 137 assertEquals(0, cells.size()); 138 } 139 } 140 141 @TestTemplate 142 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 143 TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }); 144 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 145 try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) { 146 byte[] row = Bytes.toBytes("SpecifiedRow"); 147 byte[] qual0 = Bytes.toBytes("qual0"); 148 byte[] qual1 = Bytes.toBytes("qual1"); 149 long now = EnvironmentEdgeManager.currentTime(); 150 Delete d = new Delete(row, now); 151 table.delete(d); 152 153 Put put = new Put(row); 154 put.addColumn(FAMILY, null, now + 1, VALUE); 155 table.put(put); 156 157 put = new Put(row); 158 put.addColumn(FAMILY, qual1, now + 2, qual1); 159 table.put(put); 160 161 put = new Put(row); 162 put.addColumn(FAMILY, qual0, now + 3, qual0); 163 table.put(put); 164 165 Result r = table.get(new Get(row)); 166 assertEquals(3, r.size(), r.toString()); 167 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 168 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 169 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 170 171 TEST_UTIL.getAdmin().flush(tableName); 172 r = table.get(new Get(row)); 173 assertEquals(3, r.size()); 174 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 175 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 176 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 177 } 178 } 179 180 @TestTemplate 181 public void testPutWithPreBatchMutate() throws Exception { 182 testPreBatchMutate(tableName, () -> { 183 try (Connection conn = getConnection(); Table t = conn.getTable(tableName)) { 184 Put put = new Put(ROW); 185 put.addColumn(FAMILY, QUALIFIER, VALUE); 186 t.put(put); 187 } catch (IOException ex) { 188 throw new RuntimeException(ex); 189 } 190 }); 191 } 192 193 @TestTemplate 194 public void testRowMutationsWithPreBatchMutate() throws Exception { 195 testPreBatchMutate(tableName, () -> { 196 try (Connection conn = getConnection(); Table t = conn.getTable(tableName)) { 197 RowMutations rm = new RowMutations(ROW, 1); 198 Put put = new Put(ROW); 199 put.addColumn(FAMILY, QUALIFIER, VALUE); 200 rm.add(put); 201 t.mutateRow(rm); 202 } catch (IOException ex) { 203 throw new RuntimeException(ex); 204 } 205 }); 206 } 207 208 private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception { 209 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 210 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 211 .setCoprocessor(WaitingForScanObserver.class.getName()).build(); 212 try (Connection conn = getConnection(); Admin admin = conn.getAdmin()) { 213 admin.createTable(tableDescriptor); 214 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 215 216 ExecutorService service = Executors.newFixedThreadPool(2); 217 service.execute(rn); 218 final List<Cell> cells = new ArrayList<>(); 219 service.execute(() -> { 220 try { 221 // waiting for update. 222 TimeUnit.SECONDS.sleep(3); 223 try (Table t = conn.getTable(tableName)) { 224 Scan scan = new Scan(); 225 try (ResultScanner scanner = t.getScanner(scan)) { 226 for (Result r : scanner) { 227 cells.addAll(Arrays.asList(r.rawCells())); 228 } 229 } 230 } 231 } catch (IOException | InterruptedException ex) { 232 throw new RuntimeException(ex); 233 } 234 }); 235 service.shutdown(); 236 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 237 assertEquals(0, cells.size(), "The write is blocking by RegionObserver#postBatchMutate" 238 + ", so the data is invisible to reader"); 239 } 240 } 241 242 @TestTemplate 243 public void testLockLeakWithDelta() throws Exception, Throwable { 244 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 245 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 246 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 247 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 248 TEST_UTIL.getAdmin().createTable(tableDescriptor); 249 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 250 251 // new a connection for lower retry number. 252 Configuration copy = getClientConf(); 253 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 254 try (Connection con = ConnectionFactory.createConnection(copy)) { 255 HRegion region = (HRegion) find(tableName); 256 region.setTimeoutForWriteLock(10); 257 ExecutorService putService = Executors.newSingleThreadExecutor(); 258 putService.execute(() -> { 259 try (Table table = con.getTable(tableName)) { 260 Put put = new Put(ROW); 261 put.addColumn(FAMILY, QUALIFIER, VALUE); 262 // the put will be blocked by WaitingForMultiMutationsObserver. 263 table.put(put); 264 } catch (IOException ex) { 265 throw new RuntimeException(ex); 266 } 267 }); 268 ExecutorService appendService = Executors.newSingleThreadExecutor(); 269 appendService.execute(() -> { 270 Append append = new Append(ROW); 271 append.addColumn(FAMILY, QUALIFIER, VALUE); 272 try (Table table = con.getTable(tableName)) { 273 table.append(append); 274 fail("The APPEND should fail because the target lock is blocked by previous put"); 275 } catch (Exception ex) { 276 } 277 }); 278 appendService.shutdown(); 279 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 280 WaitingForMultiMutationsObserver observer = 281 find(tableName, WaitingForMultiMutationsObserver.class); 282 observer.latch.countDown(); 283 putService.shutdown(); 284 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 285 try (Table table = con.getTable(tableName)) { 286 Result r = table.get(new Get(ROW)); 287 assertFalse(r.isEmpty()); 288 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 289 } 290 } 291 HRegion region = (HRegion) find(tableName); 292 int readLockCount = region.getReadLockCount(); 293 LOG.info("readLockCount:" + readLockCount); 294 assertEquals(0, readLockCount); 295 } 296 297 @TestTemplate 298 public void testMultiRowMutations() throws Exception, Throwable { 299 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 300 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 301 .setCoprocessor(MultiRowMutationEndpoint.class.getName()) 302 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 303 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 304 TEST_UTIL.getAdmin().createTable(tableDescriptor); 305 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 306 307 // new a connection for lower retry number. 308 Configuration copy = getClientConf(); 309 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 310 try (Connection con = ConnectionFactory.createConnection(copy)) { 311 byte[] row = Bytes.toBytes("ROW-0"); 312 byte[] rowLocked = Bytes.toBytes("ROW-1"); 313 byte[] value0 = Bytes.toBytes("VALUE-0"); 314 byte[] value1 = Bytes.toBytes("VALUE-1"); 315 byte[] value2 = Bytes.toBytes("VALUE-2"); 316 assertNoLocks(tableName); 317 ExecutorService putService = Executors.newSingleThreadExecutor(); 318 putService.execute(() -> { 319 try (Table table = con.getTable(tableName)) { 320 Put put0 = new Put(rowLocked); 321 put0.addColumn(FAMILY, QUALIFIER, value0); 322 // the put will be blocked by WaitingForMultiMutationsObserver. 323 table.put(put0); 324 } catch (IOException ex) { 325 throw new RuntimeException(ex); 326 } 327 }); 328 ExecutorService cpService = Executors.newSingleThreadExecutor(); 329 AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean(); 330 cpService.execute(() -> { 331 Put put1 = new Put(row); 332 Put put2 = new Put(rowLocked); 333 put1.addColumn(FAMILY, QUALIFIER, value1); 334 put2.addColumn(FAMILY, QUALIFIER, value2); 335 try (Table table = con.getTable(tableName)) { 336 MultiRowMutationProtos.MutateRowsRequest request = 337 MultiRowMutationProtos.MutateRowsRequest.newBuilder() 338 .addMutationRequest( 339 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1)) 340 .addMutationRequest( 341 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2)) 342 .build(); 343 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW, 344 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 345 ServerRpcController controller = new ServerRpcController(); 346 CoprocessorRpcUtils.BlockingRpcCallback< 347 MultiRowMutationProtos.MutateRowsResponse> rpcCallback = 348 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 349 exe.mutateRows(controller, request, rpcCallback); 350 if ( 351 controller.failedOnException() 352 && !(controller.getFailedOn() instanceof UnknownProtocolException) 353 ) { 354 exceptionDuringMutateRows.set(true); 355 } 356 return rpcCallback.get(); 357 }); 358 } catch (Throwable ex) { 359 LOG.error("encountered " + ex); 360 } 361 }); 362 cpService.shutdown(); 363 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 364 WaitingForMultiMutationsObserver observer = 365 find(tableName, WaitingForMultiMutationsObserver.class); 366 observer.latch.countDown(); 367 putService.shutdown(); 368 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 369 try (Table table = con.getTable(tableName)) { 370 Get g0 = new Get(row); 371 Get g1 = new Get(rowLocked); 372 Result r0 = table.get(g0); 373 Result r1 = table.get(g1); 374 assertTrue(r0.isEmpty()); 375 assertFalse(r1.isEmpty()); 376 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 377 } 378 assertNoLocks(tableName); 379 if (!exceptionDuringMutateRows.get()) { 380 fail("This cp should fail because the target lock is blocked by previous put"); 381 } 382 } 383 } 384 385 /** 386 * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is 387 * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that 388 * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make 389 * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch 390 * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0) 391 * are put into memstore, then a scanner with a smaller readpoint can see these data, which 392 * disobey the multi version concurrency control rules. This test case is to reproduce this 393 * scenario. 394 */ 395 @TestTemplate 396 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 397 TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }); 398 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 399 try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) { 400 // put two row first to init the scanner 401 Put put = new Put(Bytes.toBytes("0")); 402 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 403 table.put(put); 404 put = new Put(Bytes.toBytes("00")); 405 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 406 table.put(put); 407 Scan scan = new Scan(); 408 scan.setTimeRange(0, Long.MAX_VALUE); 409 scan.setCaching(1); 410 try (ResultScanner scanner = table.getScanner(scan)) { 411 int rowNum = scanner.next() != null ? 1 : 0; 412 // the started scanner shouldn't see the rows put below 413 for (int i = 1; i < 1000; i++) { 414 put = new Put(Bytes.toBytes(String.valueOf(i))); 415 put.setDurability(Durability.ASYNC_WAL); 416 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 417 table.put(put); 418 } 419 rowNum += Iterables.size(scanner); 420 // scanner should only see two rows 421 assertEquals(2, rowNum); 422 } 423 try (ResultScanner scanner = table.getScanner(scan)) { 424 int rowNum = Iterables.size(scanner); 425 // the new scanner should see all rows 426 assertEquals(1001, rowNum); 427 } 428 } 429 } 430 431 private static void assertNoLocks(final TableName tableName) 432 throws IOException, InterruptedException { 433 HRegion region = (HRegion) find(tableName); 434 assertEquals(0, region.getLockedRows().size()); 435 } 436 437 private static HRegion find(final TableName tableName) throws IOException, InterruptedException { 438 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 439 List<HRegion> regions = rs.getRegions(tableName); 440 assertEquals(1, regions.size()); 441 return regions.get(0); 442 } 443 444 private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz) 445 throws IOException, InterruptedException { 446 HRegion region = find(tableName); 447 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 448 assertTrue(clz.isInstance(cp), "The cp instance should be " + clz.getName() 449 + ", current instance is " + cp.getClass().getName()); 450 return clz.cast(cp); 451 } 452 453 public static class WaitingForMultiMutationsObserver 454 implements RegionCoprocessor, RegionObserver { 455 final CountDownLatch latch = new CountDownLatch(1); 456 457 @Override 458 public Optional<RegionObserver> getRegionObserver() { 459 return Optional.of(this); 460 } 461 462 @Override 463 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 464 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 465 try { 466 latch.await(); 467 } catch (InterruptedException ex) { 468 throw new IOException(ex); 469 } 470 } 471 } 472 473 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 474 private final CountDownLatch latch = new CountDownLatch(1); 475 476 @Override 477 public Optional<RegionObserver> getRegionObserver() { 478 return Optional.of(this); 479 } 480 481 @Override 482 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 483 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 484 try { 485 // waiting for scanner 486 latch.await(); 487 } catch (InterruptedException ex) { 488 throw new IOException(ex); 489 } 490 } 491 492 @Override 493 public RegionScanner postScannerOpen( 494 final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan, 495 final RegionScanner s) throws IOException { 496 latch.countDown(); 497 return s; 498 } 499 } 500 501 static byte[] generateHugeValue(int size) { 502 Random rand = ThreadLocalRandom.current(); 503 byte[] value = new byte[size]; 504 for (int i = 0; i < value.length; i++) { 505 value[i] = (byte) rand.nextInt(256); 506 } 507 return value; 508 } 509 510 @TestTemplate 511 public void testScanWithBatchSizeReturnIncompleteCells() 512 throws IOException, InterruptedException { 513 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 514 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 515 .build(); 516 try (Table table = TEST_UTIL.createTable(hd, null)) { 517 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 518 519 Put put = new Put(ROW); 520 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 521 table.put(put); 522 523 put = new Put(ROW); 524 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 525 table.put(put); 526 527 for (int i = 2; i < 5; i++) { 528 for (int version = 0; version < 2; version++) { 529 put = new Put(ROW); 530 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 531 table.put(put); 532 } 533 } 534 535 Scan scan = new Scan(); 536 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 537 .setMaxResultSize(4 * 1024 * 1024); 538 Result result; 539 try (ResultScanner scanner = table.getScanner(scan)) { 540 List<Result> list = new ArrayList<>(); 541 /* 542 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The 543 * second scan rpc should return a result with 3 cells, because reach the batch limit = 3; 544 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 545 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 546 * into two result: 2+3 -> 3+2; 547 */ 548 while ((result = scanner.next()) != null) { 549 list.add(result); 550 } 551 552 assertEquals(5, list.stream().mapToInt(Result::size).sum()); 553 assertEquals(2, list.size()); 554 assertEquals(3, list.get(0).size()); 555 assertEquals(2, list.get(1).size()); 556 } 557 558 scan = new Scan(); 559 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 560 .setMaxResultSize(4 * 1024 * 1024); 561 try (ResultScanner scanner = table.getScanner(scan)) { 562 List<Result> list = new ArrayList<>(); 563 while ((result = scanner.next()) != null) { 564 list.add(result); 565 } 566 assertEquals(5, list.stream().mapToInt(Result::size).sum()); 567 assertEquals(3, list.size()); 568 assertEquals(2, list.get(0).size()); 569 assertEquals(2, list.get(1).size()); 570 assertEquals(1, list.get(2).size()); 571 } 572 } 573 } 574}