001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.Coprocessor; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.RegionMetrics; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 056import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 057import org.apache.hadoop.hbase.ipc.RpcClient; 058import org.apache.hadoop.hbase.ipc.RpcClientFactory; 059import org.apache.hadoop.hbase.ipc.ServerRpcController; 060import org.apache.hadoop.hbase.regionserver.HRegion; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 063import org.apache.hadoop.hbase.regionserver.RegionScanner; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.junit.jupiter.api.AfterAll; 067import org.junit.jupiter.api.AfterEach; 068import org.junit.jupiter.api.BeforeEach; 069import org.junit.jupiter.api.Test; 070import org.junit.jupiter.api.TestInfo; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; 077 078public class FromClientSide3TestBase { 079 080 private static final Logger LOG = LoggerFactory.getLogger(FromClientSide3TestBase.class); 081 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 082 083 private static int WAITTABLE_MILLIS; 084 private static byte[] FAMILY; 085 private static int SLAVES; 086 private static byte[] ROW; 087 private static byte[] ANOTHERROW; 088 private static byte[] QUALIFIER; 089 private static byte[] VALUE; 090 private static byte[] COL_QUAL; 091 private static byte[] VAL_BYTES; 092 private static byte[] ROW_BYTES; 093 094 private TableName tableName; 095 096 protected static void startCluster() throws Exception { 097 WAITTABLE_MILLIS = 10000; 098 FAMILY = Bytes.toBytes("testFamily"); 099 SLAVES = 3; 100 ROW = Bytes.toBytes("testRow"); 101 ANOTHERROW = Bytes.toBytes("anotherrow"); 102 QUALIFIER = Bytes.toBytes("testQualifier"); 103 VALUE = Bytes.toBytes("testValue"); 104 COL_QUAL = Bytes.toBytes("f1"); 105 VAL_BYTES = Bytes.toBytes("v1"); 106 ROW_BYTES = Bytes.toBytes("r1"); 107 TEST_UTIL.startMiniCluster(SLAVES); 108 } 109 110 @AfterAll 111 public static void shutdownCluster() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 @BeforeEach 116 public void setUp(TestInfo testInfo) throws Exception { 117 tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 118 } 119 120 @AfterEach 121 public void tearDown() throws Exception { 122 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 123 LOG.info("Tear down, remove table=" + htd.getTableName()); 124 TEST_UTIL.deleteTable(htd.getTableName()); 125 } 126 } 127 128 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception { 129 Put put = new Put(row); 130 Random rand = ThreadLocalRandom.current(); 131 for (int i = 0; i < nPuts; i++) { 132 byte[] qualifier = Bytes.toBytes(rand.nextInt()); 133 byte[] value = Bytes.toBytes(rand.nextInt()); 134 put.addColumn(family, qualifier, value); 135 } 136 table.put(put); 137 } 138 139 private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family, 140 int nFlushes, int nPuts) throws Exception { 141 for (int i = 0; i < nFlushes; i++) { 142 randomCFPuts(table, row, family, nPuts); 143 admin.flush(table.getName()); 144 } 145 } 146 147 private static List<Cell> toList(ResultScanner scanner) { 148 try { 149 List<Cell> cells = new ArrayList<>(); 150 for (Result r : scanner) { 151 cells.addAll(r.listCells()); 152 } 153 return cells; 154 } finally { 155 scanner.close(); 156 } 157 } 158 159 @Test 160 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 161 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 162 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 163 byte[] row = Bytes.toBytes("SpecifiedRow"); 164 byte[] value0 = Bytes.toBytes("value_0"); 165 byte[] value1 = Bytes.toBytes("value_1"); 166 Put put = new Put(row); 167 put.addColumn(FAMILY, QUALIFIER, VALUE); 168 table.put(put); 169 Delete d = new Delete(row); 170 table.delete(d); 171 put = new Put(row); 172 put.addColumn(FAMILY, null, value0); 173 table.put(put); 174 put = new Put(row); 175 put.addColumn(FAMILY, null, value1); 176 table.put(put); 177 List<Cell> cells = toList(table.getScanner(new Scan())); 178 assertEquals(1, cells.size()); 179 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 180 181 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 182 assertEquals(1, cells.size()); 183 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 184 185 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 186 assertEquals(0, cells.size()); 187 188 TEST_UTIL.getAdmin().flush(tableName); 189 cells = toList(table.getScanner(new Scan())); 190 assertEquals(1, cells.size()); 191 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 192 193 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 194 assertEquals(1, cells.size()); 195 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 196 197 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 198 assertEquals(0, cells.size()); 199 } 200 } 201 202 @Test 203 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 204 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 205 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 206 byte[] row = Bytes.toBytes("SpecifiedRow"); 207 byte[] qual0 = Bytes.toBytes("qual0"); 208 byte[] qual1 = Bytes.toBytes("qual1"); 209 long now = EnvironmentEdgeManager.currentTime(); 210 Delete d = new Delete(row, now); 211 table.delete(d); 212 213 Put put = new Put(row); 214 put.addColumn(FAMILY, null, now + 1, VALUE); 215 table.put(put); 216 217 put = new Put(row); 218 put.addColumn(FAMILY, qual1, now + 2, qual1); 219 table.put(put); 220 221 put = new Put(row); 222 put.addColumn(FAMILY, qual0, now + 3, qual0); 223 table.put(put); 224 225 Result r = table.get(new Get(row)); 226 assertEquals(3, r.size(), r.toString()); 227 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 228 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 229 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 230 231 TEST_UTIL.getAdmin().flush(tableName); 232 r = table.get(new Get(row)); 233 assertEquals(3, r.size()); 234 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 235 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 236 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 237 } 238 } 239 240 private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region) 241 throws IOException { 242 for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) { 243 if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) { 244 return metrics.getStoreFileCount(); 245 } 246 } 247 return 0; 248 } 249 250 // override the config settings at the CF level and ensure priority 251 @Test 252 public void testAdvancedConfigOverride() throws Exception { 253 /* 254 * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3, 255 * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If 256 * we use the HTD value instead of the default config value, adding 3 files and issuing a 257 * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify 258 * table. The CF schema should override the Table schema and now cause a minor compaction. 259 */ 260 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 261 262 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) { 263 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 264 Admin admin = TEST_UTIL.getAdmin(); 265 266 // Create 3 store files. 267 byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt()); 268 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100); 269 270 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 271 // Verify we have multiple store files. 272 HRegionLocation loc = locator.getRegionLocation(row, true); 273 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1); 274 275 // Issue a compaction request 276 admin.compact(tableName); 277 278 // poll wait for the compactions to happen 279 for (int i = 0; i < 10 * 1000 / 40; ++i) { 280 // The number of store files after compaction should be lesser. 281 loc = locator.getRegionLocation(row, true); 282 if (!loc.getRegion().isOffline()) { 283 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) { 284 break; 285 } 286 } 287 Thread.sleep(40); 288 } 289 // verify the compactions took place and that we didn't just time out 290 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1); 291 292 // change the compaction.min config option for this table to 5 293 LOG.info("hbase.hstore.compaction.min should now be 5"); 294 TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor()) 295 .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build(); 296 admin.modifyTable(htd); 297 LOG.info("alter status finished"); 298 299 // Create 3 more store files. 300 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10); 301 302 // Issue a compaction request 303 admin.compact(tableName); 304 305 // This time, the compaction request should not happen 306 Thread.sleep(10 * 1000); 307 loc = locator.getRegionLocation(row, true); 308 int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion()); 309 assertTrue(sfCount > 1); 310 311 // change an individual CF's config option to 2 & online schema update 312 LOG.info("hbase.hstore.compaction.min should now be 2"); 313 htd = TableDescriptorBuilder.newBuilder(htd) 314 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) 315 .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build()) 316 .build(); 317 admin.modifyTable(htd); 318 LOG.info("alter status finished"); 319 320 // Issue a compaction request 321 admin.compact(tableName); 322 323 // poll wait for the compactions to happen 324 for (int i = 0; i < 10 * 1000 / 40; ++i) { 325 loc = locator.getRegionLocation(row, true); 326 try { 327 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) { 328 break; 329 } 330 } catch (Exception e) { 331 LOG.debug("Waiting for region to come online: " 332 + Bytes.toStringBinary(loc.getRegion().getRegionName())); 333 } 334 Thread.sleep(40); 335 } 336 337 // verify the compaction took place and that we didn't just time out 338 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount); 339 340 // Finally, ensure that we can remove a custom config value after we made it 341 LOG.info("Removing CF config value"); 342 LOG.info("hbase.hstore.compaction.min should now be 5"); 343 htd = TableDescriptorBuilder.newBuilder(htd) 344 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) 345 .setValue("hbase.hstore.compaction.min", null).build()) 346 .build(); 347 admin.modifyTable(htd); 348 LOG.info("alter status finished"); 349 assertNull(table.getDescriptor().getColumnFamily(FAMILY) 350 .getValue(Bytes.toBytes("hbase.hstore.compaction.min"))); 351 } 352 } 353 } 354 355 @Test 356 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 357 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 358 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 359 List<Put> actions = new ArrayList<>(); 360 Object[] results = new Object[2]; 361 // create an empty Put 362 Put put1 = new Put(ROW); 363 actions.add(put1); 364 365 Put put2 = new Put(ANOTHERROW); 366 put2.addColumn(FAMILY, QUALIFIER, VALUE); 367 actions.add(put2); 368 369 table.batch(actions, results); 370 fail("Empty Put should have failed the batch call"); 371 } catch (IllegalArgumentException iae) { 372 } 373 } 374 375 // Test Table.batch with large amount of mutations against the same key. 376 // It used to trigger read lock's "Maximum lock count exceeded" Error. 377 @Test 378 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 379 int sixtyFourK = 64 * 1024; 380 List<Put> actions = new ArrayList<>(); 381 Object[] results = new Object[(sixtyFourK + 1) * 2]; 382 383 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 384 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 385 386 for (int i = 0; i < sixtyFourK + 1; i++) { 387 Put put1 = new Put(ROW); 388 put1.addColumn(FAMILY, QUALIFIER, VALUE); 389 actions.add(put1); 390 391 Put put2 = new Put(ANOTHERROW); 392 put2.addColumn(FAMILY, QUALIFIER, VALUE); 393 actions.add(put2); 394 } 395 396 table.batch(actions, results); 397 } 398 } 399 400 @Test 401 public void testBatchWithRowMutation() throws Exception { 402 LOG.info("Starting testBatchWithRowMutation"); 403 byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") }; 404 405 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 406 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 407 408 RowMutations arm = RowMutations 409 .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 410 Object[] batchResult = new Object[1]; 411 table.batch(Arrays.asList(arm), batchResult); 412 413 Get g = new Get(ROW); 414 Result r = table.get(g); 415 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 416 417 arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 418 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 419 table.batch(Arrays.asList(arm), batchResult); 420 r = table.get(g); 421 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 422 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 423 424 // Test that we get the correct remote exception for RowMutations from batch() 425 try { 426 arm = RowMutations.of(Collections.singletonList( 427 new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE))); 428 table.batch(Arrays.asList(arm), batchResult); 429 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 430 } catch (RetriesExhaustedException e) { 431 String msg = e.getMessage(); 432 assertTrue(msg.contains("NoSuchColumnFamilyException")); 433 } 434 } 435 } 436 437 @Test 438 public void testBatchWithCheckAndMutate() throws Exception { 439 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 440 byte[] row1 = Bytes.toBytes("row1"); 441 byte[] row2 = Bytes.toBytes("row2"); 442 byte[] row3 = Bytes.toBytes("row3"); 443 byte[] row4 = Bytes.toBytes("row4"); 444 byte[] row5 = Bytes.toBytes("row5"); 445 byte[] row6 = Bytes.toBytes("row6"); 446 byte[] row7 = Bytes.toBytes("row7"); 447 448 table 449 .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 450 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 451 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 452 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 453 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 454 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 455 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))); 456 457 CheckAndMutate checkAndMutate1 = 458 CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 459 .build(new RowMutations(row1) 460 .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 461 .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 462 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 463 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 464 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 465 RowMutations mutations = 466 new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 467 .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 468 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 469 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 470 CheckAndMutate checkAndMutate2 = 471 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 472 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 473 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 474 CheckAndMutate checkAndMutate3 = 475 CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 476 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 477 CheckAndMutate checkAndMutate4 = 478 CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 479 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 480 481 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 482 checkAndMutate3, checkAndMutate4); 483 Object[] results = new Object[actions.size()]; 484 table.batch(actions, results); 485 486 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; 487 assertTrue(checkAndMutateResult.isSuccess()); 488 assertEquals(3L, 489 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 490 assertEquals("d", 491 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 492 493 assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); 494 495 Result result = (Result) results[2]; 496 assertTrue(result.getExists()); 497 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 498 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 499 500 checkAndMutateResult = (CheckAndMutateResult) results[3]; 501 assertFalse(checkAndMutateResult.isSuccess()); 502 assertNull(checkAndMutateResult.getResult()); 503 504 assertTrue(((Result) results[4]).isEmpty()); 505 506 checkAndMutateResult = (CheckAndMutateResult) results[5]; 507 assertTrue(checkAndMutateResult.isSuccess()); 508 assertEquals(11, 509 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F")))); 510 511 checkAndMutateResult = (CheckAndMutateResult) results[6]; 512 assertTrue(checkAndMutateResult.isSuccess()); 513 assertEquals("gg", 514 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G")))); 515 516 result = table.get(new Get(row1)); 517 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 518 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 519 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 520 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 521 522 result = table.get(new Get(row3)); 523 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 524 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 525 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 526 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 527 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 528 529 result = table.get(new Get(row4)); 530 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 531 532 result = table.get(new Get(row5)); 533 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 534 535 result = table.get(new Get(row6)); 536 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 537 538 result = table.get(new Get(row7)); 539 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 540 } 541 } 542 543 @Test 544 public void testHTableExistsMethodSingleRegionSingleGet() 545 throws IOException, InterruptedException { 546 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 547 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 548 549 // Test with a single region table. 550 Put put = new Put(ROW); 551 put.addColumn(FAMILY, QUALIFIER, VALUE); 552 553 Get get = new Get(ROW); 554 555 boolean exist = table.exists(get); 556 assertFalse(exist); 557 558 table.put(put); 559 560 exist = table.exists(get); 561 assertTrue(exist); 562 } 563 } 564 565 @Test 566 public void testHTableExistsMethodSingleRegionMultipleGets() 567 throws IOException, InterruptedException { 568 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 569 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 570 571 Put put = new Put(ROW); 572 put.addColumn(FAMILY, QUALIFIER, VALUE); 573 table.put(put); 574 575 List<Get> gets = new ArrayList<>(); 576 gets.add(new Get(ROW)); 577 gets.add(new Get(ANOTHERROW)); 578 579 boolean[] results = table.exists(gets); 580 assertTrue(results[0]); 581 assertFalse(results[1]); 582 } 583 } 584 585 @Test 586 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 587 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 588 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 589 590 Put put = new Put(ROW); 591 put.addColumn(FAMILY, QUALIFIER, VALUE); 592 table.put(put); 593 594 Get get = new Get(ROW); 595 596 boolean exist = table.exists(get); 597 assertEquals(true, exist); 598 599 Result result = table.get(get); 600 assertEquals(false, result.isEmpty()); 601 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 602 } 603 } 604 605 @Test 606 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 607 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 608 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 609 610 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 611 Put put = new Put(ROW); 612 put.addColumn(FAMILY, QUALIFIER, VALUE); 613 table.put(put); 614 put = new Put(ROW2); 615 put.addColumn(FAMILY, QUALIFIER, VALUE); 616 table.put(put); 617 618 Get get = new Get(ROW); 619 Get get2 = new Get(ROW2); 620 ArrayList<Get> getList = new ArrayList<>(2); 621 getList.add(get); 622 getList.add(get2); 623 624 boolean[] exists = table.exists(getList); 625 assertEquals(true, exists[0]); 626 assertEquals(true, exists[1]); 627 628 Result[] result = table.get(getList); 629 assertEquals(false, result[0].isEmpty()); 630 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 631 assertEquals(false, result[1].isEmpty()); 632 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 633 } 634 } 635 636 @Test 637 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 638 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 639 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 640 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 641 642 Put put = new Put(ROW); 643 put.addColumn(FAMILY, QUALIFIER, VALUE); 644 645 Get get = new Get(ROW); 646 647 boolean exist = table.exists(get); 648 assertFalse(exist); 649 650 table.put(put); 651 652 exist = table.exists(get); 653 assertTrue(exist); 654 } 655 } 656 657 @Test 658 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 659 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 660 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 661 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 662 663 Put put = new Put(ROW); 664 put.addColumn(FAMILY, QUALIFIER, VALUE); 665 table.put(put); 666 667 List<Get> gets = new ArrayList<>(); 668 gets.add(new Get(ANOTHERROW)); 669 gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 }))); 670 gets.add(new Get(ROW)); 671 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 }))); 672 673 LOG.info("Calling exists"); 674 boolean[] results = table.exists(gets); 675 assertFalse(results[0]); 676 assertFalse(results[1]); 677 assertTrue(results[2]); 678 assertFalse(results[3]); 679 680 // Test with the first region. 681 put = new Put(new byte[] { 0x00 }); 682 put.addColumn(FAMILY, QUALIFIER, VALUE); 683 table.put(put); 684 685 gets = new ArrayList<>(); 686 gets.add(new Get(new byte[] { 0x00 })); 687 gets.add(new Get(new byte[] { 0x00, 0x00 })); 688 results = table.exists(gets); 689 assertTrue(results[0]); 690 assertFalse(results[1]); 691 692 // Test with the last region 693 put = new Put(new byte[] { (byte) 0xff, (byte) 0xff }); 694 put.addColumn(FAMILY, QUALIFIER, VALUE); 695 table.put(put); 696 697 gets = new ArrayList<>(); 698 gets.add(new Get(new byte[] { (byte) 0xff })); 699 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff })); 700 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff })); 701 results = table.exists(gets); 702 assertFalse(results[0]); 703 assertTrue(results[1]); 704 assertFalse(results[2]); 705 } 706 } 707 708 @Test 709 public void testGetEmptyRow() throws Exception { 710 // Create a table and put in 1 row 711 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 712 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 713 714 Put put = new Put(ROW_BYTES); 715 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 716 table.put(put); 717 718 // Try getting the row with an empty row key 719 Result res = null; 720 try { 721 res = table.get(new Get(new byte[0])); 722 fail(); 723 } catch (IllegalArgumentException e) { 724 // Expected. 725 } 726 assertTrue(res == null); 727 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 728 assertTrue(res.isEmpty() == true); 729 res = table.get(new Get(ROW_BYTES)); 730 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 731 } 732 } 733 734 @Test 735 public void testConnectionDefaultUsesCodec() throws Exception { 736 try ( 737 RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) { 738 assertTrue(client.hasCellBlockSupport()); 739 } 740 } 741 742 @Test 743 public void testPutWithPreBatchMutate() throws Exception { 744 testPreBatchMutate(tableName, () -> { 745 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 746 Put put = new Put(ROW); 747 put.addColumn(FAMILY, QUALIFIER, VALUE); 748 t.put(put); 749 } catch (IOException ex) { 750 throw new RuntimeException(ex); 751 } 752 }); 753 } 754 755 @Test 756 public void testRowMutationsWithPreBatchMutate() throws Exception { 757 testPreBatchMutate(tableName, () -> { 758 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 759 RowMutations rm = new RowMutations(ROW, 1); 760 Put put = new Put(ROW); 761 put.addColumn(FAMILY, QUALIFIER, VALUE); 762 rm.add(put); 763 t.mutateRow(rm); 764 } catch (IOException ex) { 765 throw new RuntimeException(ex); 766 } 767 }); 768 } 769 770 private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception { 771 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 772 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 773 .setCoprocessor(WaitingForScanObserver.class.getName()).build(); 774 TEST_UTIL.getAdmin().createTable(tableDescriptor); 775 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 776 777 ExecutorService service = Executors.newFixedThreadPool(2); 778 service.execute(rn); 779 final List<Cell> cells = new ArrayList<>(); 780 service.execute(() -> { 781 try { 782 // waiting for update. 783 TimeUnit.SECONDS.sleep(3); 784 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 785 Scan scan = new Scan(); 786 try (ResultScanner scanner = t.getScanner(scan)) { 787 for (Result r : scanner) { 788 cells.addAll(Arrays.asList(r.rawCells())); 789 } 790 } 791 } 792 } catch (IOException | InterruptedException ex) { 793 throw new RuntimeException(ex); 794 } 795 }); 796 service.shutdown(); 797 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 798 assertEquals(0, cells.size(), "The write is blocking by RegionObserver#postBatchMutate" 799 + ", so the data is invisible to reader"); 800 TEST_UTIL.deleteTable(tableName); 801 } 802 803 @Test 804 public void testLockLeakWithDelta() throws Exception, Throwable { 805 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 806 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 807 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 808 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 809 TEST_UTIL.getAdmin().createTable(tableDescriptor); 810 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 811 812 // new a connection for lower retry number. 813 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 814 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 815 try (Connection con = ConnectionFactory.createConnection(copy)) { 816 HRegion region = (HRegion) find(tableName); 817 region.setTimeoutForWriteLock(10); 818 ExecutorService putService = Executors.newSingleThreadExecutor(); 819 putService.execute(() -> { 820 try (Table table = con.getTable(tableName)) { 821 Put put = new Put(ROW); 822 put.addColumn(FAMILY, QUALIFIER, VALUE); 823 // the put will be blocked by WaitingForMultiMutationsObserver. 824 table.put(put); 825 } catch (IOException ex) { 826 throw new RuntimeException(ex); 827 } 828 }); 829 ExecutorService appendService = Executors.newSingleThreadExecutor(); 830 appendService.execute(() -> { 831 Append append = new Append(ROW); 832 append.addColumn(FAMILY, QUALIFIER, VALUE); 833 try (Table table = con.getTable(tableName)) { 834 table.append(append); 835 fail("The APPEND should fail because the target lock is blocked by previous put"); 836 } catch (Exception ex) { 837 } 838 }); 839 appendService.shutdown(); 840 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 841 WaitingForMultiMutationsObserver observer = 842 find(tableName, WaitingForMultiMutationsObserver.class); 843 observer.latch.countDown(); 844 putService.shutdown(); 845 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 846 try (Table table = con.getTable(tableName)) { 847 Result r = table.get(new Get(ROW)); 848 assertFalse(r.isEmpty()); 849 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 850 } 851 } 852 HRegion region = (HRegion) find(tableName); 853 int readLockCount = region.getReadLockCount(); 854 LOG.info("readLockCount:" + readLockCount); 855 assertEquals(0, readLockCount); 856 } 857 858 @Test 859 public void testMultiRowMutations() throws Exception, Throwable { 860 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 861 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 862 .setCoprocessor(MultiRowMutationEndpoint.class.getName()) 863 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 864 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 865 TEST_UTIL.getAdmin().createTable(tableDescriptor); 866 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 867 868 // new a connection for lower retry number. 869 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 870 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 871 try (Connection con = ConnectionFactory.createConnection(copy)) { 872 byte[] row = Bytes.toBytes("ROW-0"); 873 byte[] rowLocked = Bytes.toBytes("ROW-1"); 874 byte[] value0 = Bytes.toBytes("VALUE-0"); 875 byte[] value1 = Bytes.toBytes("VALUE-1"); 876 byte[] value2 = Bytes.toBytes("VALUE-2"); 877 assertNoLocks(tableName); 878 ExecutorService putService = Executors.newSingleThreadExecutor(); 879 putService.execute(() -> { 880 try (Table table = con.getTable(tableName)) { 881 Put put0 = new Put(rowLocked); 882 put0.addColumn(FAMILY, QUALIFIER, value0); 883 // the put will be blocked by WaitingForMultiMutationsObserver. 884 table.put(put0); 885 } catch (IOException ex) { 886 throw new RuntimeException(ex); 887 } 888 }); 889 ExecutorService cpService = Executors.newSingleThreadExecutor(); 890 AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean(); 891 cpService.execute(() -> { 892 Put put1 = new Put(row); 893 Put put2 = new Put(rowLocked); 894 put1.addColumn(FAMILY, QUALIFIER, value1); 895 put2.addColumn(FAMILY, QUALIFIER, value2); 896 try (Table table = con.getTable(tableName)) { 897 MultiRowMutationProtos.MutateRowsRequest request = 898 MultiRowMutationProtos.MutateRowsRequest.newBuilder() 899 .addMutationRequest( 900 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1)) 901 .addMutationRequest( 902 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2)) 903 .build(); 904 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW, 905 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 906 ServerRpcController controller = new ServerRpcController(); 907 CoprocessorRpcUtils.BlockingRpcCallback< 908 MultiRowMutationProtos.MutateRowsResponse> rpcCallback = 909 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 910 exe.mutateRows(controller, request, rpcCallback); 911 if ( 912 controller.failedOnException() 913 && !(controller.getFailedOn() instanceof UnknownProtocolException) 914 ) { 915 exceptionDuringMutateRows.set(true); 916 } 917 return rpcCallback.get(); 918 }); 919 } catch (Throwable ex) { 920 LOG.error("encountered " + ex); 921 } 922 }); 923 cpService.shutdown(); 924 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 925 WaitingForMultiMutationsObserver observer = 926 find(tableName, WaitingForMultiMutationsObserver.class); 927 observer.latch.countDown(); 928 putService.shutdown(); 929 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 930 try (Table table = con.getTable(tableName)) { 931 Get g0 = new Get(row); 932 Get g1 = new Get(rowLocked); 933 Result r0 = table.get(g0); 934 Result r1 = table.get(g1); 935 assertTrue(r0.isEmpty()); 936 assertFalse(r1.isEmpty()); 937 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 938 } 939 assertNoLocks(tableName); 940 if (!exceptionDuringMutateRows.get()) { 941 fail("This cp should fail because the target lock is blocked by previous put"); 942 } 943 } 944 } 945 946 /** 947 * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is 948 * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that 949 * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make 950 * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch 951 * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0) 952 * are put into memstore, then a scanner with a smaller readpoint can see these data, which 953 * disobey the multi version concurrency control rules. This test case is to reproduce this 954 * scenario. 955 */ 956 @Test 957 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 958 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 959 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 960 // put two row first to init the scanner 961 Put put = new Put(Bytes.toBytes("0")); 962 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 963 table.put(put); 964 put = new Put(Bytes.toBytes("00")); 965 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 966 table.put(put); 967 Scan scan = new Scan(); 968 scan.setTimeRange(0, Long.MAX_VALUE); 969 scan.setCaching(1); 970 ResultScanner scanner = table.getScanner(scan); 971 int rowNum = scanner.next() != null ? 1 : 0; 972 // the started scanner shouldn't see the rows put below 973 for (int i = 1; i < 1000; i++) { 974 put = new Put(Bytes.toBytes(String.valueOf(i))); 975 put.setDurability(Durability.ASYNC_WAL); 976 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 977 table.put(put); 978 } 979 for (Result result : scanner) { 980 rowNum++; 981 } 982 // scanner should only see two rows 983 assertEquals(2, rowNum); 984 scanner = table.getScanner(scan); 985 rowNum = 0; 986 for (Result result : scanner) { 987 rowNum++; 988 } 989 // the new scanner should see all rows 990 assertEquals(1001, rowNum); 991 } 992 } 993 994 @Test 995 public void testPutThenGetWithMultipleThreads() throws Exception { 996 final int THREAD_NUM = 20; 997 final int ROUND_NUM = 10; 998 for (int round = 0; round < ROUND_NUM; round++) { 999 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 1000 final AtomicInteger successCnt = new AtomicInteger(0); 1001 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 1002 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1003 1004 for (int i = 0; i < THREAD_NUM; i++) { 1005 final int index = i; 1006 Thread t = new Thread(new Runnable() { 1007 1008 @Override 1009 public void run() { 1010 final byte[] row = Bytes.toBytes("row-" + index); 1011 final byte[] value = Bytes.toBytes("v" + index); 1012 try { 1013 Put put = new Put(row); 1014 put.addColumn(FAMILY, QUALIFIER, value); 1015 ht.put(put); 1016 Get get = new Get(row); 1017 Result result = ht.get(get); 1018 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 1019 if (Bytes.equals(value, returnedValue)) { 1020 successCnt.getAndIncrement(); 1021 } else { 1022 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 1023 + ", returned value: " 1024 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 1025 } 1026 } catch (Throwable e) { 1027 // do nothing 1028 } 1029 } 1030 }); 1031 threads.add(t); 1032 } 1033 for (Thread t : threads) { 1034 t.start(); 1035 } 1036 for (Thread t : threads) { 1037 t.join(); 1038 } 1039 assertEquals(THREAD_NUM, successCnt.get(), "Not equal in round " + round); 1040 } 1041 TEST_UTIL.deleteTable(tableName); 1042 } 1043 } 1044 1045 private static void assertNoLocks(final TableName tableName) 1046 throws IOException, InterruptedException { 1047 HRegion region = (HRegion) find(tableName); 1048 assertEquals(0, region.getLockedRows().size()); 1049 } 1050 1051 private static HRegion find(final TableName tableName) throws IOException, InterruptedException { 1052 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 1053 List<HRegion> regions = rs.getRegions(tableName); 1054 assertEquals(1, regions.size()); 1055 return regions.get(0); 1056 } 1057 1058 private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz) 1059 throws IOException, InterruptedException { 1060 HRegion region = find(tableName); 1061 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1062 assertTrue(clz.isInstance(cp), "The cp instance should be " + clz.getName() 1063 + ", current instance is " + cp.getClass().getName()); 1064 return clz.cast(cp); 1065 } 1066 1067 public static class WaitingForMultiMutationsObserver 1068 implements RegionCoprocessor, RegionObserver { 1069 final CountDownLatch latch = new CountDownLatch(1); 1070 1071 @Override 1072 public Optional<RegionObserver> getRegionObserver() { 1073 return Optional.of(this); 1074 } 1075 1076 @Override 1077 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 1078 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1079 try { 1080 latch.await(); 1081 } catch (InterruptedException ex) { 1082 throw new IOException(ex); 1083 } 1084 } 1085 } 1086 1087 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1088 private final CountDownLatch latch = new CountDownLatch(1); 1089 1090 @Override 1091 public Optional<RegionObserver> getRegionObserver() { 1092 return Optional.of(this); 1093 } 1094 1095 @Override 1096 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 1097 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1098 try { 1099 // waiting for scanner 1100 latch.await(); 1101 } catch (InterruptedException ex) { 1102 throw new IOException(ex); 1103 } 1104 } 1105 1106 @Override 1107 public RegionScanner postScannerOpen( 1108 final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan, 1109 final RegionScanner s) throws IOException { 1110 latch.countDown(); 1111 return s; 1112 } 1113 } 1114 1115 static byte[] generateHugeValue(int size) { 1116 Random rand = ThreadLocalRandom.current(); 1117 byte[] value = new byte[size]; 1118 for (int i = 0; i < value.length; i++) { 1119 value[i] = (byte) rand.nextInt(256); 1120 } 1121 return value; 1122 } 1123 1124 @Test 1125 public void testScanWithBatchSizeReturnIncompleteCells() 1126 throws IOException, InterruptedException { 1127 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1128 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1129 .build(); 1130 try (Table table = TEST_UTIL.createTable(hd, null)) { 1131 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1132 1133 Put put = new Put(ROW); 1134 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1135 table.put(put); 1136 1137 put = new Put(ROW); 1138 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1139 table.put(put); 1140 1141 for (int i = 2; i < 5; i++) { 1142 for (int version = 0; version < 2; version++) { 1143 put = new Put(ROW); 1144 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1145 table.put(put); 1146 } 1147 } 1148 1149 Scan scan = new Scan(); 1150 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1151 .setMaxResultSize(4 * 1024 * 1024); 1152 Result result; 1153 try (ResultScanner scanner = table.getScanner(scan)) { 1154 List<Result> list = new ArrayList<>(); 1155 /* 1156 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The 1157 * second scan rpc should return a result with 3 cells, because reach the batch limit = 3; 1158 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1159 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1160 * into two result: 2+3 -> 3+2; 1161 */ 1162 while ((result = scanner.next()) != null) { 1163 list.add(result); 1164 } 1165 1166 assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1167 assertEquals(2, list.size()); 1168 assertEquals(3, list.get(0).size()); 1169 assertEquals(2, list.get(1).size()); 1170 } 1171 1172 scan = new Scan(); 1173 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1174 .setMaxResultSize(4 * 1024 * 1024); 1175 try (ResultScanner scanner = table.getScanner(scan)) { 1176 List<Result> list = new ArrayList<>(); 1177 while ((result = scanner.next()) != null) { 1178 list.add(result); 1179 } 1180 assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1181 assertEquals(3, list.size()); 1182 assertEquals(2, list.get(0).size()); 1183 assertEquals(2, list.get(1).size()); 1184 assertEquals(1, list.get(2).size()); 1185 } 1186 } 1187 } 1188}