001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.Pair; 067import org.junit.After; 068import org.junit.AfterClass; 069import org.junit.Assert; 070import org.junit.Before; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Rule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.junit.rules.TestName; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 082 083@Category({ LargeTests.class, ClientTests.class }) 084public class TestFromClientSide3 { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestFromClientSide3.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 091 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 092 private static final int WAITTABLE_MILLIS = 10000; 093 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 094 private static int SLAVES = 3; 095 private static final byte[] ROW = Bytes.toBytes("testRow"); 096 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 097 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 098 private static final byte[] VALUE = Bytes.toBytes("testValue"); 099 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 100 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 101 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 102 103 @Rule 104 public TestName name = new TestName(); 105 private TableName tableName; 106 107 /** 108 * @throws java.lang.Exception 109 */ 110 @BeforeClass 111 public static void setUpBeforeClass() throws Exception { 112 TEST_UTIL.startMiniCluster(SLAVES); 113 } 114 115 /** 116 * @throws java.lang.Exception 117 */ 118 @AfterClass 119 public static void tearDownAfterClass() throws Exception { 120 TEST_UTIL.shutdownMiniCluster(); 121 } 122 123 /** 124 * @throws java.lang.Exception 125 */ 126 @Before 127 public void setUp() throws Exception { 128 tableName = TableName.valueOf(name.getMethodName()); 129 } 130 131 /** 132 * @throws java.lang.Exception 133 */ 134 @After 135 public void tearDown() throws Exception { 136 for (HTableDescriptor htd : TEST_UTIL.getAdmin().listTables()) { 137 LOG.info("Tear down, remove table=" + htd.getTableName()); 138 TEST_UTIL.deleteTable(htd.getTableName()); 139 } 140 } 141 142 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception { 143 Put put = new Put(row); 144 Random rand = ThreadLocalRandom.current(); 145 for (int i = 0; i < nPuts; i++) { 146 byte[] qualifier = Bytes.toBytes(rand.nextInt()); 147 byte[] value = Bytes.toBytes(rand.nextInt()); 148 put.addColumn(family, qualifier, value); 149 } 150 table.put(put); 151 } 152 153 private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, byte[] row, byte[] family, 154 int nFlushes, int nPuts) throws Exception { 155 156 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) { 157 // connection needed for poll-wait 158 HRegionLocation loc = locator.getRegionLocation(row, true); 159 AdminProtos.AdminService.BlockingInterface server = 160 ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName()); 161 byte[] regName = loc.getRegionInfo().getRegionName(); 162 163 for (int i = 0; i < nFlushes; i++) { 164 randomCFPuts(table, row, family, nPuts); 165 List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); 166 int sfCount = sf.size(); 167 168 admin.flush(table.getName()); 169 } 170 } 171 } 172 173 private static List<Cell> toList(ResultScanner scanner) { 174 try { 175 List<Cell> cells = new ArrayList<>(); 176 for (Result r : scanner) { 177 cells.addAll(r.listCells()); 178 } 179 return cells; 180 } finally { 181 scanner.close(); 182 } 183 } 184 185 @Test 186 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 187 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 188 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 189 byte[] row = Bytes.toBytes("SpecifiedRow"); 190 byte[] value0 = Bytes.toBytes("value_0"); 191 byte[] value1 = Bytes.toBytes("value_1"); 192 Put put = new Put(row); 193 put.addColumn(FAMILY, QUALIFIER, VALUE); 194 table.put(put); 195 Delete d = new Delete(row); 196 table.delete(d); 197 put = new Put(row); 198 put.addColumn(FAMILY, null, value0); 199 table.put(put); 200 put = new Put(row); 201 put.addColumn(FAMILY, null, value1); 202 table.put(put); 203 List<Cell> cells = toList(table.getScanner(new Scan())); 204 assertEquals(1, cells.size()); 205 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 206 207 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 208 assertEquals(1, cells.size()); 209 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 210 211 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 212 assertEquals(0, cells.size()); 213 214 TEST_UTIL.getAdmin().flush(tableName); 215 cells = toList(table.getScanner(new Scan())); 216 assertEquals(1, cells.size()); 217 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 218 219 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 220 assertEquals(1, cells.size()); 221 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 222 223 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 224 assertEquals(0, cells.size()); 225 } 226 } 227 228 @Test 229 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 230 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 231 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 232 byte[] row = Bytes.toBytes("SpecifiedRow"); 233 byte[] qual0 = Bytes.toBytes("qual0"); 234 byte[] qual1 = Bytes.toBytes("qual1"); 235 long now = EnvironmentEdgeManager.currentTime(); 236 Delete d = new Delete(row, now); 237 table.delete(d); 238 239 Put put = new Put(row); 240 put.addColumn(FAMILY, null, now + 1, VALUE); 241 table.put(put); 242 243 put = new Put(row); 244 put.addColumn(FAMILY, qual1, now + 2, qual1); 245 table.put(put); 246 247 put = new Put(row); 248 put.addColumn(FAMILY, qual0, now + 3, qual0); 249 table.put(put); 250 251 Result r = table.get(new Get(row)); 252 assertEquals(r.toString(), 3, r.size()); 253 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 254 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 255 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 256 257 TEST_UTIL.getAdmin().flush(tableName); 258 r = table.get(new Get(row)); 259 assertEquals(3, r.size()); 260 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 261 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 262 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 263 } 264 } 265 266 // override the config settings at the CF level and ensure priority 267 @Test 268 public void testAdvancedConfigOverride() throws Exception { 269 /* 270 * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3, 271 * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If 272 * we use the HTD value instead of the default config value, adding 3 files and issuing a 273 * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify 274 * table. The CF schema should override the Table schema and now cause a minor compaction. 275 */ 276 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 277 278 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 279 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 280 try (Admin admin = TEST_UTIL.getAdmin()) { 281 ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); 282 283 // Create 3 store files. 284 byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt()); 285 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100); 286 287 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 288 // Verify we have multiple store files. 289 HRegionLocation loc = locator.getRegionLocation(row, true); 290 byte[] regionName = loc.getRegionInfo().getRegionName(); 291 AdminProtos.AdminService.BlockingInterface server = 292 connection.getAdmin(loc.getServerName()); 293 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); 294 295 // Issue a compaction request 296 admin.compact(tableName); 297 298 // poll wait for the compactions to happen 299 for (int i = 0; i < 10 * 1000 / 40; ++i) { 300 // The number of store files after compaction should be lesser. 301 loc = locator.getRegionLocation(row, true); 302 if (!loc.getRegionInfo().isOffline()) { 303 regionName = loc.getRegionInfo().getRegionName(); 304 server = connection.getAdmin(loc.getServerName()); 305 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) { 306 break; 307 } 308 } 309 Thread.sleep(40); 310 } 311 // verify the compactions took place and that we didn't just time out 312 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1); 313 314 // change the compaction.min config option for this table to 5 315 LOG.info("hbase.hstore.compaction.min should now be 5"); 316 HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor()); 317 htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); 318 admin.modifyTable(tableName, htd); 319 Pair<Integer, Integer> st = admin.getAlterStatus(tableName); 320 while (null != st && st.getFirst() > 0) { 321 LOG.debug(st.getFirst() + " regions left to update"); 322 Thread.sleep(40); 323 st = admin.getAlterStatus(tableName); 324 } 325 LOG.info("alter status finished"); 326 327 // Create 3 more store files. 328 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10); 329 330 // Issue a compaction request 331 admin.compact(tableName); 332 333 // This time, the compaction request should not happen 334 Thread.sleep(10 * 1000); 335 loc = locator.getRegionLocation(row, true); 336 regionName = loc.getRegionInfo().getRegionName(); 337 server = connection.getAdmin(loc.getServerName()); 338 int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size(); 339 assertTrue(sfCount > 1); 340 341 // change an individual CF's config option to 2 & online schema update 342 LOG.info("hbase.hstore.compaction.min should now be 2"); 343 HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 344 hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); 345 htd.modifyFamily(hcd); 346 admin.modifyTable(tableName, htd); 347 st = admin.getAlterStatus(tableName); 348 while (null != st && st.getFirst() > 0) { 349 LOG.debug(st.getFirst() + " regions left to update"); 350 Thread.sleep(40); 351 st = admin.getAlterStatus(tableName); 352 } 353 LOG.info("alter status finished"); 354 355 // Issue a compaction request 356 admin.compact(tableName); 357 358 // poll wait for the compactions to happen 359 for (int i = 0; i < 10 * 1000 / 40; ++i) { 360 loc = locator.getRegionLocation(row, true); 361 regionName = loc.getRegionInfo().getRegionName(); 362 try { 363 server = connection.getAdmin(loc.getServerName()); 364 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) { 365 break; 366 } 367 } catch (Exception e) { 368 LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName)); 369 } 370 Thread.sleep(40); 371 } 372 373 // verify the compaction took place and that we didn't just time out 374 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount); 375 376 // Finally, ensure that we can remove a custom config value after we made it 377 LOG.info("Removing CF config value"); 378 LOG.info("hbase.hstore.compaction.min should now be 5"); 379 hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 380 hcd.setValue("hbase.hstore.compaction.min", null); 381 htd.modifyFamily(hcd); 382 admin.modifyTable(tableName, htd); 383 st = admin.getAlterStatus(tableName); 384 while (null != st && st.getFirst() > 0) { 385 LOG.debug(st.getFirst() + " regions left to update"); 386 Thread.sleep(40); 387 st = admin.getAlterStatus(tableName); 388 } 389 LOG.info("alter status finished"); 390 assertNull( 391 table.getTableDescriptor().getFamily(FAMILY).getValue("hbase.hstore.compaction.min")); 392 } 393 } 394 } 395 } 396 397 @Test 398 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 399 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 400 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 401 List actions = (List) new ArrayList(); 402 Object[] results = new Object[2]; 403 // create an empty Put 404 Put put1 = new Put(ROW); 405 actions.add(put1); 406 407 Put put2 = new Put(ANOTHERROW); 408 put2.addColumn(FAMILY, QUALIFIER, VALUE); 409 actions.add(put2); 410 411 table.batch(actions, results); 412 fail("Empty Put should have failed the batch call"); 413 } catch (IllegalArgumentException iae) { 414 } 415 } 416 417 // Test Table.batch with large amount of mutations against the same key. 418 // It used to trigger read lock's "Maximum lock count exceeded" Error. 419 @Test 420 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 421 int sixtyFourK = 64 * 1024; 422 List actions = new ArrayList(); 423 Object[] results = new Object[(sixtyFourK + 1) * 2]; 424 425 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 426 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 427 428 for (int i = 0; i < sixtyFourK + 1; i++) { 429 Put put1 = new Put(ROW); 430 put1.addColumn(FAMILY, QUALIFIER, VALUE); 431 actions.add(put1); 432 433 Put put2 = new Put(ANOTHERROW); 434 put2.addColumn(FAMILY, QUALIFIER, VALUE); 435 actions.add(put2); 436 } 437 438 table.batch(actions, results); 439 } 440 } 441 442 @Test 443 public void testBatchWithRowMutation() throws Exception { 444 LOG.info("Starting testBatchWithRowMutation"); 445 byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") }; 446 447 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 448 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 449 450 RowMutations arm = RowMutations 451 .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 452 Object[] batchResult = new Object[1]; 453 table.batch(Arrays.asList(arm), batchResult); 454 455 Get g = new Get(ROW); 456 Result r = table.get(g); 457 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 458 459 arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 460 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 461 table.batch(Arrays.asList(arm), batchResult); 462 r = table.get(g); 463 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 464 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 465 466 // Test that we get the correct remote exception for RowMutations from batch() 467 try { 468 arm = RowMutations.of(Collections.singletonList( 469 new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE))); 470 table.batch(Arrays.asList(arm), batchResult); 471 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 472 } catch (RetriesExhaustedWithDetailsException e) { 473 String msg = e.getMessage(); 474 assertTrue(msg.contains("NoSuchColumnFamilyException")); 475 } 476 } 477 } 478 479 @Test 480 public void testBatchWithCheckAndMutate() throws Exception { 481 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 482 byte[] row1 = Bytes.toBytes("row1"); 483 byte[] row2 = Bytes.toBytes("row2"); 484 byte[] row3 = Bytes.toBytes("row3"); 485 byte[] row4 = Bytes.toBytes("row4"); 486 byte[] row5 = Bytes.toBytes("row5"); 487 byte[] row6 = Bytes.toBytes("row6"); 488 byte[] row7 = Bytes.toBytes("row7"); 489 490 table 491 .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 492 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 493 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 494 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 495 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 496 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 497 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))); 498 499 CheckAndMutate checkAndMutate1 = 500 CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 501 .build(new RowMutations(row1) 502 .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 503 .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 504 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 505 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 506 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 507 RowMutations mutations = new RowMutations(row3) 508 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 509 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 510 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 511 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 512 CheckAndMutate checkAndMutate2 = 513 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 514 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 515 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 516 CheckAndMutate checkAndMutate3 = 517 CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 518 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 519 CheckAndMutate checkAndMutate4 = 520 CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 521 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 522 523 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 524 checkAndMutate3, checkAndMutate4); 525 Object[] results = new Object[actions.size()]; 526 table.batch(actions, results); 527 528 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; 529 assertTrue(checkAndMutateResult.isSuccess()); 530 assertEquals(3L, 531 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 532 assertEquals("d", 533 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 534 535 assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); 536 537 Result result = (Result) results[2]; 538 assertTrue(result.getExists()); 539 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 540 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 541 542 checkAndMutateResult = (CheckAndMutateResult) results[3]; 543 assertFalse(checkAndMutateResult.isSuccess()); 544 assertNull(checkAndMutateResult.getResult()); 545 546 assertTrue(((Result) results[4]).isEmpty()); 547 548 checkAndMutateResult = (CheckAndMutateResult) results[5]; 549 assertTrue(checkAndMutateResult.isSuccess()); 550 assertEquals(11, 551 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F")))); 552 553 checkAndMutateResult = (CheckAndMutateResult) results[6]; 554 assertTrue(checkAndMutateResult.isSuccess()); 555 assertEquals("gg", 556 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G")))); 557 558 result = table.get(new Get(row1)); 559 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 560 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 561 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 562 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 563 564 result = table.get(new Get(row3)); 565 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 566 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 567 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 568 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 569 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 570 571 result = table.get(new Get(row4)); 572 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 573 574 result = table.get(new Get(row5)); 575 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 576 577 result = table.get(new Get(row6)); 578 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 579 580 result = table.get(new Get(row7)); 581 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 582 } 583 } 584 585 @Test 586 public void testHTableExistsMethodSingleRegionSingleGet() 587 throws IOException, InterruptedException { 588 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 589 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 590 591 // Test with a single region table. 592 Put put = new Put(ROW); 593 put.addColumn(FAMILY, QUALIFIER, VALUE); 594 595 Get get = new Get(ROW); 596 597 boolean exist = table.exists(get); 598 assertFalse(exist); 599 600 table.put(put); 601 602 exist = table.exists(get); 603 assertTrue(exist); 604 } 605 } 606 607 @Test 608 public void testHTableExistsMethodSingleRegionMultipleGets() 609 throws IOException, InterruptedException { 610 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 611 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 612 613 Put put = new Put(ROW); 614 put.addColumn(FAMILY, QUALIFIER, VALUE); 615 table.put(put); 616 617 List<Get> gets = new ArrayList<>(); 618 gets.add(new Get(ROW)); 619 gets.add(new Get(ANOTHERROW)); 620 621 boolean[] results = table.exists(gets); 622 assertTrue(results[0]); 623 assertFalse(results[1]); 624 } 625 } 626 627 @Test 628 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 629 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 630 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 631 632 Put put = new Put(ROW); 633 put.addColumn(FAMILY, QUALIFIER, VALUE); 634 table.put(put); 635 636 Get get = new Get(ROW); 637 638 boolean exist = table.exists(get); 639 assertEquals(true, exist); 640 641 Result result = table.get(get); 642 assertEquals(false, result.isEmpty()); 643 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 644 } 645 } 646 647 @Test 648 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 649 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 650 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 651 652 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 653 Put put = new Put(ROW); 654 put.addColumn(FAMILY, QUALIFIER, VALUE); 655 table.put(put); 656 put = new Put(ROW2); 657 put.addColumn(FAMILY, QUALIFIER, VALUE); 658 table.put(put); 659 660 Get get = new Get(ROW); 661 Get get2 = new Get(ROW2); 662 ArrayList<Get> getList = new ArrayList(2); 663 getList.add(get); 664 getList.add(get2); 665 666 boolean[] exists = table.existsAll(getList); 667 assertEquals(true, exists[0]); 668 assertEquals(true, exists[1]); 669 670 Result[] result = table.get(getList); 671 assertEquals(false, result[0].isEmpty()); 672 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 673 assertEquals(false, result[1].isEmpty()); 674 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 675 } 676 } 677 678 @Test 679 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 680 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 681 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 682 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 683 684 Put put = new Put(ROW); 685 put.addColumn(FAMILY, QUALIFIER, VALUE); 686 687 Get get = new Get(ROW); 688 689 boolean exist = table.exists(get); 690 assertFalse(exist); 691 692 table.put(put); 693 694 exist = table.exists(get); 695 assertTrue(exist); 696 } 697 } 698 699 @Test 700 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 701 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 702 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 703 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 704 705 Put put = new Put(ROW); 706 put.addColumn(FAMILY, QUALIFIER, VALUE); 707 table.put(put); 708 709 List<Get> gets = new ArrayList<>(); 710 gets.add(new Get(ANOTHERROW)); 711 gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 }))); 712 gets.add(new Get(ROW)); 713 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 }))); 714 715 LOG.info("Calling exists"); 716 boolean[] results = table.existsAll(gets); 717 assertFalse(results[0]); 718 assertFalse(results[1]); 719 assertTrue(results[2]); 720 assertFalse(results[3]); 721 722 // Test with the first region. 723 put = new Put(new byte[] { 0x00 }); 724 put.addColumn(FAMILY, QUALIFIER, VALUE); 725 table.put(put); 726 727 gets = new ArrayList<>(); 728 gets.add(new Get(new byte[] { 0x00 })); 729 gets.add(new Get(new byte[] { 0x00, 0x00 })); 730 results = table.existsAll(gets); 731 assertTrue(results[0]); 732 assertFalse(results[1]); 733 734 // Test with the last region 735 put = new Put(new byte[] { (byte) 0xff, (byte) 0xff }); 736 put.addColumn(FAMILY, QUALIFIER, VALUE); 737 table.put(put); 738 739 gets = new ArrayList<>(); 740 gets.add(new Get(new byte[] { (byte) 0xff })); 741 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff })); 742 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff })); 743 results = table.existsAll(gets); 744 assertFalse(results[0]); 745 assertTrue(results[1]); 746 assertFalse(results[2]); 747 } 748 } 749 750 @Test 751 public void testGetEmptyRow() throws Exception { 752 // Create a table and put in 1 row 753 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 754 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 755 756 Put put = new Put(ROW_BYTES); 757 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 758 table.put(put); 759 760 // Try getting the row with an empty row key 761 Result res = null; 762 try { 763 res = table.get(new Get(new byte[0])); 764 fail(); 765 } catch (IllegalArgumentException e) { 766 // Expected. 767 } 768 assertTrue(res == null); 769 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 770 assertTrue(res.isEmpty() == true); 771 res = table.get(new Get(ROW_BYTES)); 772 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 773 } 774 } 775 776 @Test 777 public void testConnectionDefaultUsesCodec() throws Exception { 778 ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); 779 assertTrue(con.hasCellBlockSupport()); 780 } 781 782 @Test 783 public void testPutWithPreBatchMutate() throws Exception { 784 testPreBatchMutate(tableName, () -> { 785 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 786 Put put = new Put(ROW); 787 put.addColumn(FAMILY, QUALIFIER, VALUE); 788 t.put(put); 789 } catch (IOException ex) { 790 throw new RuntimeException(ex); 791 } 792 }); 793 } 794 795 @Test 796 public void testRowMutationsWithPreBatchMutate() throws Exception { 797 testPreBatchMutate(tableName, () -> { 798 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 799 RowMutations rm = new RowMutations(ROW, 1); 800 Put put = new Put(ROW); 801 put.addColumn(FAMILY, QUALIFIER, VALUE); 802 rm.add(put); 803 t.mutateRow(rm); 804 } catch (IOException ex) { 805 throw new RuntimeException(ex); 806 } 807 }); 808 } 809 810 private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception { 811 HTableDescriptor desc = new HTableDescriptor(tableName); 812 desc.addCoprocessor(WaitingForScanObserver.class.getName()); 813 desc.addFamily(new HColumnDescriptor(FAMILY)); 814 TEST_UTIL.getAdmin().createTable(desc); 815 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 816 817 ExecutorService service = Executors.newFixedThreadPool(2); 818 service.execute(rn); 819 final List<Cell> cells = new ArrayList<>(); 820 service.execute(() -> { 821 try { 822 // waiting for update. 823 TimeUnit.SECONDS.sleep(3); 824 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 825 Scan scan = new Scan(); 826 try (ResultScanner scanner = t.getScanner(scan)) { 827 for (Result r : scanner) { 828 cells.addAll(Arrays.asList(r.rawCells())); 829 } 830 } 831 } 832 } catch (IOException | InterruptedException ex) { 833 throw new RuntimeException(ex); 834 } 835 }); 836 service.shutdown(); 837 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 838 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 839 + ", so the data is invisible to reader", 0, cells.size()); 840 TEST_UTIL.deleteTable(tableName); 841 } 842 843 @Test 844 public void testLockLeakWithDelta() throws Exception, Throwable { 845 HTableDescriptor desc = new HTableDescriptor(tableName); 846 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 847 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 848 desc.addFamily(new HColumnDescriptor(FAMILY)); 849 TEST_UTIL.getAdmin().createTable(desc); 850 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 851 852 // new a connection for lower retry number. 853 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 854 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 855 try (Connection con = ConnectionFactory.createConnection(copy)) { 856 HRegion region = (HRegion) find(tableName); 857 region.setTimeoutForWriteLock(10); 858 ExecutorService putService = Executors.newSingleThreadExecutor(); 859 putService.execute(() -> { 860 try (Table table = con.getTable(tableName)) { 861 Put put = new Put(ROW); 862 put.addColumn(FAMILY, QUALIFIER, VALUE); 863 // the put will be blocked by WaitingForMultiMutationsObserver. 864 table.put(put); 865 } catch (IOException ex) { 866 throw new RuntimeException(ex); 867 } 868 }); 869 ExecutorService appendService = Executors.newSingleThreadExecutor(); 870 appendService.execute(() -> { 871 Append append = new Append(ROW); 872 append.addColumn(FAMILY, QUALIFIER, VALUE); 873 try (Table table = con.getTable(tableName)) { 874 table.append(append); 875 fail("The APPEND should fail because the target lock is blocked by previous put"); 876 } catch (Exception ex) { 877 } 878 }); 879 appendService.shutdown(); 880 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 881 WaitingForMultiMutationsObserver observer = 882 find(tableName, WaitingForMultiMutationsObserver.class); 883 observer.latch.countDown(); 884 putService.shutdown(); 885 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 886 try (Table table = con.getTable(tableName)) { 887 Result r = table.get(new Get(ROW)); 888 assertFalse(r.isEmpty()); 889 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 890 } 891 } 892 HRegion region = (HRegion) find(tableName); 893 int readLockCount = region.getReadLockCount(); 894 LOG.info("readLockCount:" + readLockCount); 895 assertEquals(0, readLockCount); 896 } 897 898 @Test 899 public void testMultiRowMutations() throws Exception, Throwable { 900 HTableDescriptor desc = new HTableDescriptor(tableName); 901 desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); 902 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 903 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 904 desc.addFamily(new HColumnDescriptor(FAMILY)); 905 TEST_UTIL.getAdmin().createTable(desc); 906 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 907 908 // new a connection for lower retry number. 909 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 910 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 911 try (Connection con = ConnectionFactory.createConnection(copy)) { 912 byte[] row = Bytes.toBytes("ROW-0"); 913 byte[] rowLocked = Bytes.toBytes("ROW-1"); 914 byte[] value0 = Bytes.toBytes("VALUE-0"); 915 byte[] value1 = Bytes.toBytes("VALUE-1"); 916 byte[] value2 = Bytes.toBytes("VALUE-2"); 917 assertNoLocks(tableName); 918 ExecutorService putService = Executors.newSingleThreadExecutor(); 919 putService.execute(() -> { 920 try (Table table = con.getTable(tableName)) { 921 Put put0 = new Put(rowLocked); 922 put0.addColumn(FAMILY, QUALIFIER, value0); 923 // the put will be blocked by WaitingForMultiMutationsObserver. 924 table.put(put0); 925 } catch (IOException ex) { 926 throw new RuntimeException(ex); 927 } 928 }); 929 ExecutorService cpService = Executors.newSingleThreadExecutor(); 930 cpService.execute(() -> { 931 boolean threw; 932 Put put1 = new Put(row); 933 Put put2 = new Put(rowLocked); 934 put1.addColumn(FAMILY, QUALIFIER, value1); 935 put2.addColumn(FAMILY, QUALIFIER, value2); 936 try (Table table = con.getTable(tableName)) { 937 MultiRowMutationProtos.MutateRowsRequest request = 938 MultiRowMutationProtos.MutateRowsRequest.newBuilder() 939 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 940 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 941 put1)) 942 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 943 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 944 put2)) 945 .build(); 946 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW, 947 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 948 ServerRpcController controller = new ServerRpcController(); 949 CoprocessorRpcUtils.BlockingRpcCallback< 950 MultiRowMutationProtos.MutateRowsResponse> rpcCallback = 951 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 952 exe.mutateRows(controller, request, rpcCallback); 953 return rpcCallback.get(); 954 }); 955 threw = false; 956 } catch (Throwable ex) { 957 threw = true; 958 } 959 if (!threw) { 960 // Can't call fail() earlier because the catch would eat it. 961 fail("This cp should fail because the target lock is blocked by previous put"); 962 } 963 }); 964 cpService.shutdown(); 965 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 966 WaitingForMultiMutationsObserver observer = 967 find(tableName, WaitingForMultiMutationsObserver.class); 968 observer.latch.countDown(); 969 putService.shutdown(); 970 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 971 try (Table table = con.getTable(tableName)) { 972 Get g0 = new Get(row); 973 Get g1 = new Get(rowLocked); 974 Result r0 = table.get(g0); 975 Result r1 = table.get(g1); 976 assertTrue(r0.isEmpty()); 977 assertFalse(r1.isEmpty()); 978 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 979 } 980 assertNoLocks(tableName); 981 } 982 } 983 984 /** 985 * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is 986 * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that 987 * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make 988 * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch 989 * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0) 990 * are put into memstore, then a scanner with a smaller readpoint can see these data, which 991 * disobey the multi version concurrency control rules. This test case is to reproduce this 992 * scenario. n 993 */ 994 @Test 995 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 996 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 997 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 998 // put two row first to init the scanner 999 Put put = new Put(Bytes.toBytes("0")); 1000 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 1001 table.put(put); 1002 put = new Put(Bytes.toBytes("00")); 1003 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 1004 table.put(put); 1005 Scan scan = new Scan(); 1006 scan.setTimeRange(0, Long.MAX_VALUE); 1007 scan.setCaching(1); 1008 ResultScanner scanner = table.getScanner(scan); 1009 int rowNum = scanner.next() != null ? 1 : 0; 1010 // the started scanner shouldn't see the rows put below 1011 for (int i = 1; i < 1000; i++) { 1012 put = new Put(Bytes.toBytes(String.valueOf(i))); 1013 put.setDurability(Durability.ASYNC_WAL); 1014 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 1015 table.put(put); 1016 } 1017 for (Result result : scanner) { 1018 rowNum++; 1019 } 1020 // scanner should only see two rows 1021 assertEquals(2, rowNum); 1022 scanner = table.getScanner(scan); 1023 rowNum = 0; 1024 for (Result result : scanner) { 1025 rowNum++; 1026 } 1027 // the new scanner should see all rows 1028 assertEquals(1001, rowNum); 1029 } 1030 } 1031 1032 @Test 1033 public void testPutThenGetWithMultipleThreads() throws Exception { 1034 final int THREAD_NUM = 20; 1035 final int ROUND_NUM = 10; 1036 for (int round = 0; round < ROUND_NUM; round++) { 1037 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 1038 final AtomicInteger successCnt = new AtomicInteger(0); 1039 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 1040 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1041 1042 for (int i = 0; i < THREAD_NUM; i++) { 1043 final int index = i; 1044 Thread t = new Thread(new Runnable() { 1045 1046 @Override 1047 public void run() { 1048 final byte[] row = Bytes.toBytes("row-" + index); 1049 final byte[] value = Bytes.toBytes("v" + index); 1050 try { 1051 Put put = new Put(row); 1052 put.addColumn(FAMILY, QUALIFIER, value); 1053 ht.put(put); 1054 Get get = new Get(row); 1055 Result result = ht.get(get); 1056 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 1057 if (Bytes.equals(value, returnedValue)) { 1058 successCnt.getAndIncrement(); 1059 } else { 1060 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 1061 + ", returned value: " 1062 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 1063 } 1064 } catch (Throwable e) { 1065 // do nothing 1066 } 1067 } 1068 }); 1069 threads.add(t); 1070 } 1071 for (Thread t : threads) { 1072 t.start(); 1073 } 1074 for (Thread t : threads) { 1075 t.join(); 1076 } 1077 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 1078 } 1079 TEST_UTIL.deleteTable(tableName); 1080 } 1081 } 1082 1083 private static void assertNoLocks(final TableName tableName) 1084 throws IOException, InterruptedException { 1085 HRegion region = (HRegion) find(tableName); 1086 assertEquals(0, region.getLockedRows().size()); 1087 } 1088 1089 private static HRegion find(final TableName tableName) throws IOException, InterruptedException { 1090 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 1091 List<HRegion> regions = rs.getRegions(tableName); 1092 assertEquals(1, regions.size()); 1093 return regions.get(0); 1094 } 1095 1096 private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz) 1097 throws IOException, InterruptedException { 1098 HRegion region = find(tableName); 1099 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1100 assertTrue("The cp instance should be " + clz.getName() + ", current instance is " 1101 + cp.getClass().getName(), clz.isInstance(cp)); 1102 return clz.cast(cp); 1103 } 1104 1105 public static class WaitingForMultiMutationsObserver 1106 implements RegionCoprocessor, RegionObserver { 1107 final CountDownLatch latch = new CountDownLatch(1); 1108 1109 @Override 1110 public Optional<RegionObserver> getRegionObserver() { 1111 return Optional.of(this); 1112 } 1113 1114 @Override 1115 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1116 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1117 try { 1118 latch.await(); 1119 } catch (InterruptedException ex) { 1120 throw new IOException(ex); 1121 } 1122 } 1123 } 1124 1125 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1126 private final CountDownLatch latch = new CountDownLatch(1); 1127 1128 @Override 1129 public Optional<RegionObserver> getRegionObserver() { 1130 return Optional.of(this); 1131 } 1132 1133 @Override 1134 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1135 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1136 try { 1137 // waiting for scanner 1138 latch.await(); 1139 } catch (InterruptedException ex) { 1140 throw new IOException(ex); 1141 } 1142 } 1143 1144 @Override 1145 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 1146 final Scan scan, final RegionScanner s) throws IOException { 1147 latch.countDown(); 1148 return s; 1149 } 1150 } 1151 1152 static byte[] generateHugeValue(int size) { 1153 Random rand = ThreadLocalRandom.current(); 1154 byte[] value = new byte[size]; 1155 for (int i = 0; i < value.length; i++) { 1156 value[i] = (byte) rand.nextInt(256); 1157 } 1158 return value; 1159 } 1160 1161 @Test 1162 public void testScanWithBatchSizeReturnIncompleteCells() 1163 throws IOException, InterruptedException { 1164 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1165 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1166 .build(); 1167 try (Table table = TEST_UTIL.createTable(hd, null)) { 1168 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1169 1170 Put put = new Put(ROW); 1171 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1172 table.put(put); 1173 1174 put = new Put(ROW); 1175 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1176 table.put(put); 1177 1178 for (int i = 2; i < 5; i++) { 1179 for (int version = 0; version < 2; version++) { 1180 put = new Put(ROW); 1181 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1182 table.put(put); 1183 } 1184 } 1185 1186 Scan scan = new Scan(); 1187 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1188 .setMaxResultSize(4 * 1024 * 1024); 1189 Result result; 1190 try (ResultScanner scanner = table.getScanner(scan)) { 1191 List<Result> list = new ArrayList<>(); 1192 /* 1193 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The 1194 * second scan rpc should return a result with 3 cells, because reach the batch limit = 3; 1195 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1196 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1197 * into two result: 2+3 -> 3+2; 1198 */ 1199 while ((result = scanner.next()) != null) { 1200 list.add(result); 1201 } 1202 1203 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1204 Assert.assertEquals(2, list.size()); 1205 Assert.assertEquals(3, list.get(0).size()); 1206 Assert.assertEquals(2, list.get(1).size()); 1207 } 1208 1209 scan = new Scan(); 1210 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1211 .setMaxResultSize(4 * 1024 * 1024); 1212 try (ResultScanner scanner = table.getScanner(scan)) { 1213 List<Result> list = new ArrayList<>(); 1214 while ((result = scanner.next()) != null) { 1215 list.add(result); 1216 } 1217 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1218 Assert.assertEquals(3, list.size()); 1219 Assert.assertEquals(2, list.get(0).size()); 1220 Assert.assertEquals(2, list.get(1).size()); 1221 Assert.assertEquals(1, list.get(2).size()); 1222 } 1223 } 1224 } 1225}