001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.Pair; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Assert; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 081 082@Category({LargeTests.class, ClientTests.class}) 083public class TestFromClientSide3 { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestFromClientSide3.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 090 private final static HBaseTestingUtility TEST_UTIL 091 = new HBaseTestingUtility(); 092 private static final int WAITTABLE_MILLIS = 10000; 093 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 094 private static Random random = new Random(); 095 private static int SLAVES = 3; 096 private static final byte[] ROW = Bytes.toBytes("testRow"); 097 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 098 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 099 private static final byte[] VALUE = Bytes.toBytes("testValue"); 100 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 101 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 102 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 103 104 @Rule 105 public TestName name = new TestName(); 106 private TableName tableName; 107 108 /** 109 * @throws java.lang.Exception 110 */ 111 @BeforeClass 112 public static void setUpBeforeClass() throws Exception { 113 TEST_UTIL.startMiniCluster(SLAVES); 114 } 115 116 /** 117 * @throws java.lang.Exception 118 */ 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 TEST_UTIL.shutdownMiniCluster(); 122 } 123 124 /** 125 * @throws java.lang.Exception 126 */ 127 @Before 128 public void setUp() throws Exception { 129 tableName = TableName.valueOf(name.getMethodName()); 130 } 131 132 /** 133 * @throws java.lang.Exception 134 */ 135 @After 136 public void tearDown() throws Exception { 137 for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) { 138 LOG.info("Tear down, remove table=" + htd.getTableName()); 139 TEST_UTIL.deleteTable(htd.getTableName()); 140 } 141 } 142 143 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) 144 throws Exception { 145 Put put = new Put(row); 146 for (int i = 0; i < nPuts; i++) { 147 byte[] qualifier = Bytes.toBytes(random.nextInt()); 148 byte[] value = Bytes.toBytes(random.nextInt()); 149 put.addColumn(family, qualifier, value); 150 } 151 table.put(put); 152 } 153 154 private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, 155 byte[] row, byte[] family, int nFlushes, int nPuts) 156 throws Exception { 157 158 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) { 159 // connection needed for poll-wait 160 HRegionLocation loc = locator.getRegionLocation(row, true); 161 AdminProtos.AdminService.BlockingInterface server = 162 ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName()); 163 byte[] regName = loc.getRegionInfo().getRegionName(); 164 165 for (int i = 0; i < nFlushes; i++) { 166 randomCFPuts(table, row, family, nPuts); 167 List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); 168 int sfCount = sf.size(); 169 170 admin.flush(table.getName()); 171 } 172 } 173 } 174 175 private static List<Cell> toList(ResultScanner scanner) { 176 try { 177 List<Cell> cells = new ArrayList<>(); 178 for (Result r : scanner) { 179 cells.addAll(r.listCells()); 180 } 181 return cells; 182 } finally { 183 scanner.close(); 184 } 185 } 186 187 @Test 188 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 189 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 190 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 191 byte[] row = Bytes.toBytes("SpecifiedRow"); 192 byte[] value0 = Bytes.toBytes("value_0"); 193 byte[] value1 = Bytes.toBytes("value_1"); 194 Put put = new Put(row); 195 put.addColumn(FAMILY, QUALIFIER, VALUE); 196 table.put(put); 197 Delete d = new Delete(row); 198 table.delete(d); 199 put = new Put(row); 200 put.addColumn(FAMILY, null, value0); 201 table.put(put); 202 put = new Put(row); 203 put.addColumn(FAMILY, null, value1); 204 table.put(put); 205 List<Cell> cells = toList(table.getScanner(new Scan())); 206 assertEquals(1, cells.size()); 207 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 208 209 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 210 assertEquals(1, cells.size()); 211 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 212 213 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 214 assertEquals(0, cells.size()); 215 216 TEST_UTIL.getAdmin().flush(tableName); 217 cells = toList(table.getScanner(new Scan())); 218 assertEquals(1, cells.size()); 219 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 220 221 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 222 assertEquals(1, cells.size()); 223 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 224 225 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 226 assertEquals(0, cells.size()); 227 } 228 } 229 230 @Test 231 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 232 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 233 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 234 byte[] row = Bytes.toBytes("SpecifiedRow"); 235 byte[] qual0 = Bytes.toBytes("qual0"); 236 byte[] qual1 = Bytes.toBytes("qual1"); 237 long now = System.currentTimeMillis(); 238 Delete d = new Delete(row, now); 239 table.delete(d); 240 241 Put put = new Put(row); 242 put.addColumn(FAMILY, null, now + 1, VALUE); 243 table.put(put); 244 245 put = new Put(row); 246 put.addColumn(FAMILY, qual1, now + 2, qual1); 247 table.put(put); 248 249 put = new Put(row); 250 put.addColumn(FAMILY, qual0, now + 3, qual0); 251 table.put(put); 252 253 Result r = table.get(new Get(row)); 254 assertEquals(r.toString(), 3, r.size()); 255 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 256 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 257 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 258 259 TEST_UTIL.getAdmin().flush(tableName); 260 r = table.get(new Get(row)); 261 assertEquals(3, r.size()); 262 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 263 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 264 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 265 } 266 } 267 268 // override the config settings at the CF level and ensure priority 269 @Test 270 public void testAdvancedConfigOverride() throws Exception { 271 /* 272 * Overall idea: (1) create 3 store files and issue a compaction. config's 273 * compaction.min == 3, so should work. (2) Increase the compaction.min 274 * toggle in the HTD to 5 and modify table. If we use the HTD value instead 275 * of the default config value, adding 3 files and issuing a compaction 276 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 277 * and modify table. The CF schema should override the Table schema and now 278 * cause a minor compaction. 279 */ 280 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 281 282 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 283 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 284 try (Admin admin = TEST_UTIL.getAdmin()) { 285 ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); 286 287 // Create 3 store files. 288 byte[] row = Bytes.toBytes(random.nextInt()); 289 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100); 290 291 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 292 // Verify we have multiple store files. 293 HRegionLocation loc = locator.getRegionLocation(row, true); 294 byte[] regionName = loc.getRegionInfo().getRegionName(); 295 AdminProtos.AdminService.BlockingInterface server = 296 connection.getAdmin(loc.getServerName()); 297 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); 298 299 // Issue a compaction request 300 admin.compact(tableName); 301 302 // poll wait for the compactions to happen 303 for (int i = 0; i < 10 * 1000 / 40; ++i) { 304 // The number of store files after compaction should be lesser. 305 loc = locator.getRegionLocation(row, true); 306 if (!loc.getRegionInfo().isOffline()) { 307 regionName = loc.getRegionInfo().getRegionName(); 308 server = connection.getAdmin(loc.getServerName()); 309 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) { 310 break; 311 } 312 } 313 Thread.sleep(40); 314 } 315 // verify the compactions took place and that we didn't just time out 316 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1); 317 318 // change the compaction.min config option for this table to 5 319 LOG.info("hbase.hstore.compaction.min should now be 5"); 320 HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor()); 321 htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); 322 admin.modifyTable(tableName, htd); 323 Pair<Integer, Integer> st = admin.getAlterStatus(tableName); 324 while (null != st && st.getFirst() > 0) { 325 LOG.debug(st.getFirst() + " regions left to update"); 326 Thread.sleep(40); 327 st = admin.getAlterStatus(tableName); 328 } 329 LOG.info("alter status finished"); 330 331 // Create 3 more store files. 332 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10); 333 334 // Issue a compaction request 335 admin.compact(tableName); 336 337 // This time, the compaction request should not happen 338 Thread.sleep(10 * 1000); 339 loc = locator.getRegionLocation(row, true); 340 regionName = loc.getRegionInfo().getRegionName(); 341 server = connection.getAdmin(loc.getServerName()); 342 int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size(); 343 assertTrue(sfCount > 1); 344 345 // change an individual CF's config option to 2 & online schema update 346 LOG.info("hbase.hstore.compaction.min should now be 2"); 347 HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 348 hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); 349 htd.modifyFamily(hcd); 350 admin.modifyTable(tableName, htd); 351 st = admin.getAlterStatus(tableName); 352 while (null != st && st.getFirst() > 0) { 353 LOG.debug(st.getFirst() + " regions left to update"); 354 Thread.sleep(40); 355 st = admin.getAlterStatus(tableName); 356 } 357 LOG.info("alter status finished"); 358 359 // Issue a compaction request 360 admin.compact(tableName); 361 362 // poll wait for the compactions to happen 363 for (int i = 0; i < 10 * 1000 / 40; ++i) { 364 loc = locator.getRegionLocation(row, true); 365 regionName = loc.getRegionInfo().getRegionName(); 366 try { 367 server = connection.getAdmin(loc.getServerName()); 368 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) { 369 break; 370 } 371 } catch (Exception e) { 372 LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName)); 373 } 374 Thread.sleep(40); 375 } 376 377 // verify the compaction took place and that we didn't just time out 378 assertTrue(ProtobufUtil.getStoreFiles( 379 server, regionName, FAMILY).size() < sfCount); 380 381 // Finally, ensure that we can remove a custom config value after we made it 382 LOG.info("Removing CF config value"); 383 LOG.info("hbase.hstore.compaction.min should now be 5"); 384 hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 385 hcd.setValue("hbase.hstore.compaction.min", null); 386 htd.modifyFamily(hcd); 387 admin.modifyTable(tableName, htd); 388 st = admin.getAlterStatus(tableName); 389 while (null != st && st.getFirst() > 0) { 390 LOG.debug(st.getFirst() + " regions left to update"); 391 Thread.sleep(40); 392 st = admin.getAlterStatus(tableName); 393 } 394 LOG.info("alter status finished"); 395 assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue( 396 "hbase.hstore.compaction.min")); 397 } 398 } 399 } 400 } 401 402 @Test 403 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 404 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 405 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 406 List actions = (List) new ArrayList(); 407 Object[] results = new Object[2]; 408 // create an empty Put 409 Put put1 = new Put(ROW); 410 actions.add(put1); 411 412 Put put2 = new Put(ANOTHERROW); 413 put2.addColumn(FAMILY, QUALIFIER, VALUE); 414 actions.add(put2); 415 416 table.batch(actions, results); 417 fail("Empty Put should have failed the batch call"); 418 } catch (IllegalArgumentException iae) { 419 } 420 } 421 422 // Test Table.batch with large amount of mutations against the same key. 423 // It used to trigger read lock's "Maximum lock count exceeded" Error. 424 @Test 425 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 426 int sixtyFourK = 64 * 1024; 427 List actions = new ArrayList(); 428 Object[] results = new Object[(sixtyFourK + 1) * 2]; 429 430 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 431 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 432 433 for (int i = 0; i < sixtyFourK + 1; i++) { 434 Put put1 = new Put(ROW); 435 put1.addColumn(FAMILY, QUALIFIER, VALUE); 436 actions.add(put1); 437 438 Put put2 = new Put(ANOTHERROW); 439 put2.addColumn(FAMILY, QUALIFIER, VALUE); 440 actions.add(put2); 441 } 442 443 table.batch(actions, results); 444 } 445 } 446 447 @Test 448 public void testBatchWithRowMutation() throws Exception { 449 LOG.info("Starting testBatchWithRowMutation"); 450 byte [][] QUALIFIERS = new byte [][] { 451 Bytes.toBytes("a"), Bytes.toBytes("b") 452 }; 453 454 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 455 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 456 457 RowMutations arm = RowMutations.of(Collections.singletonList( 458 new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 459 Object[] batchResult = new Object[1]; 460 table.batch(Arrays.asList(arm), batchResult); 461 462 Get g = new Get(ROW); 463 Result r = table.get(g); 464 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 465 466 arm = RowMutations.of(Arrays.asList( 467 new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 468 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 469 table.batch(Arrays.asList(arm), batchResult); 470 r = table.get(g); 471 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 472 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 473 474 // Test that we get the correct remote exception for RowMutations from batch() 475 try { 476 arm = RowMutations.of(Collections.singletonList( 477 new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); 478 table.batch(Arrays.asList(arm), batchResult); 479 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 480 } catch (RetriesExhaustedWithDetailsException e) { 481 String msg = e.getMessage(); 482 assertTrue(msg.contains("NoSuchColumnFamilyException")); 483 } 484 } 485 } 486 487 @Test 488 public void testBatchWithCheckAndMutate() throws Exception { 489 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 490 byte[] row1 = Bytes.toBytes("row1"); 491 byte[] row2 = Bytes.toBytes("row2"); 492 byte[] row3 = Bytes.toBytes("row3"); 493 byte[] row4 = Bytes.toBytes("row4"); 494 byte[] row5 = Bytes.toBytes("row5"); 495 byte[] row6 = Bytes.toBytes("row6"); 496 byte[] row7 = Bytes.toBytes("row7"); 497 498 table.put(Arrays.asList( 499 new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 500 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 501 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 502 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 503 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 504 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 505 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))); 506 507 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) 508 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 509 .build(new RowMutations(row1) 510 .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 511 .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 512 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 513 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 514 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 515 RowMutations mutations = new RowMutations(row3) 516 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 517 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 518 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 519 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 520 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) 521 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 522 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 523 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 524 CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6) 525 .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 526 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 527 CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7) 528 .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 529 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 530 531 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 532 checkAndMutate3, checkAndMutate4); 533 Object[] results = new Object[actions.size()]; 534 table.batch(actions, results); 535 536 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; 537 assertTrue(checkAndMutateResult.isSuccess()); 538 assertEquals(3L, 539 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 540 assertEquals("d", 541 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 542 543 assertEquals("b", 544 Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); 545 546 Result result = (Result) results[2]; 547 assertTrue(result.getExists()); 548 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 549 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 550 551 checkAndMutateResult = (CheckAndMutateResult) results[3]; 552 assertFalse(checkAndMutateResult.isSuccess()); 553 assertNull(checkAndMutateResult.getResult()); 554 555 assertTrue(((Result) results[4]).isEmpty()); 556 557 checkAndMutateResult = (CheckAndMutateResult) results[5]; 558 assertTrue(checkAndMutateResult.isSuccess()); 559 assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult() 560 .getValue(FAMILY, Bytes.toBytes("F")))); 561 562 checkAndMutateResult = (CheckAndMutateResult) results[6]; 563 assertTrue(checkAndMutateResult.isSuccess()); 564 assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult() 565 .getValue(FAMILY, Bytes.toBytes("G")))); 566 567 result = table.get(new Get(row1)); 568 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 569 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 570 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 571 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 572 573 result = table.get(new Get(row3)); 574 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 575 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 576 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 577 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 578 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 579 580 result = table.get(new Get(row4)); 581 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 582 583 result = table.get(new Get(row5)); 584 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 585 586 result = table.get(new Get(row6)); 587 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 588 589 result = table.get(new Get(row7)); 590 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 591 } 592 } 593 594 @Test 595 public void testHTableExistsMethodSingleRegionSingleGet() 596 throws IOException, InterruptedException { 597 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 598 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 599 600 // Test with a single region table. 601 Put put = new Put(ROW); 602 put.addColumn(FAMILY, QUALIFIER, VALUE); 603 604 Get get = new Get(ROW); 605 606 boolean exist = table.exists(get); 607 assertFalse(exist); 608 609 table.put(put); 610 611 exist = table.exists(get); 612 assertTrue(exist); 613 } 614 } 615 616 @Test 617 public void testHTableExistsMethodSingleRegionMultipleGets() 618 throws IOException, InterruptedException { 619 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 620 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 621 622 Put put = new Put(ROW); 623 put.addColumn(FAMILY, QUALIFIER, VALUE); 624 table.put(put); 625 626 List<Get> gets = new ArrayList<>(); 627 gets.add(new Get(ROW)); 628 gets.add(new Get(ANOTHERROW)); 629 630 boolean[] results = table.exists(gets); 631 assertTrue(results[0]); 632 assertFalse(results[1]); 633 } 634 } 635 636 @Test 637 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 638 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 639 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 640 641 Put put = new Put(ROW); 642 put.addColumn(FAMILY, QUALIFIER, VALUE); 643 table.put(put); 644 645 Get get = new Get(ROW); 646 647 boolean exist = table.exists(get); 648 assertEquals(true, exist); 649 650 Result result = table.get(get); 651 assertEquals(false, result.isEmpty()); 652 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 653 } 654 } 655 656 @Test 657 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 658 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 659 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 660 661 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 662 Put put = new Put(ROW); 663 put.addColumn(FAMILY, QUALIFIER, VALUE); 664 table.put(put); 665 put = new Put(ROW2); 666 put.addColumn(FAMILY, QUALIFIER, VALUE); 667 table.put(put); 668 669 Get get = new Get(ROW); 670 Get get2 = new Get(ROW2); 671 ArrayList<Get> getList = new ArrayList(2); 672 getList.add(get); 673 getList.add(get2); 674 675 boolean[] exists = table.existsAll(getList); 676 assertEquals(true, exists[0]); 677 assertEquals(true, exists[1]); 678 679 Result[] result = table.get(getList); 680 assertEquals(false, result[0].isEmpty()); 681 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 682 assertEquals(false, result[1].isEmpty()); 683 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 684 } 685 } 686 687 @Test 688 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 689 try (Table table = TEST_UTIL.createTable( 690 tableName, new byte[][] { FAMILY }, 691 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 692 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 693 694 Put put = new Put(ROW); 695 put.addColumn(FAMILY, QUALIFIER, VALUE); 696 697 Get get = new Get(ROW); 698 699 boolean exist = table.exists(get); 700 assertFalse(exist); 701 702 table.put(put); 703 704 exist = table.exists(get); 705 assertTrue(exist); 706 } 707 } 708 709 @Test 710 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 711 try (Table table = TEST_UTIL.createTable( 712 tableName, 713 new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 714 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 715 716 Put put = new Put(ROW); 717 put.addColumn(FAMILY, QUALIFIER, VALUE); 718 table.put(put); 719 720 List<Get> gets = new ArrayList<>(); 721 gets.add(new Get(ANOTHERROW)); 722 gets.add(new Get(Bytes.add(ROW, new byte[]{0x00}))); 723 gets.add(new Get(ROW)); 724 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00}))); 725 726 LOG.info("Calling exists"); 727 boolean[] results = table.existsAll(gets); 728 assertFalse(results[0]); 729 assertFalse(results[1]); 730 assertTrue(results[2]); 731 assertFalse(results[3]); 732 733 // Test with the first region. 734 put = new Put(new byte[]{0x00}); 735 put.addColumn(FAMILY, QUALIFIER, VALUE); 736 table.put(put); 737 738 gets = new ArrayList<>(); 739 gets.add(new Get(new byte[]{0x00})); 740 gets.add(new Get(new byte[]{0x00, 0x00})); 741 results = table.existsAll(gets); 742 assertTrue(results[0]); 743 assertFalse(results[1]); 744 745 // Test with the last region 746 put = new Put(new byte[]{(byte) 0xff, (byte) 0xff}); 747 put.addColumn(FAMILY, QUALIFIER, VALUE); 748 table.put(put); 749 750 gets = new ArrayList<>(); 751 gets.add(new Get(new byte[]{(byte) 0xff})); 752 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff})); 753 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff})); 754 results = table.existsAll(gets); 755 assertFalse(results[0]); 756 assertTrue(results[1]); 757 assertFalse(results[2]); 758 } 759 } 760 761 @Test 762 public void testGetEmptyRow() throws Exception { 763 //Create a table and put in 1 row 764 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 765 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 766 767 Put put = new Put(ROW_BYTES); 768 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 769 table.put(put); 770 771 //Try getting the row with an empty row key 772 Result res = null; 773 try { 774 res = table.get(new Get(new byte[0])); 775 fail(); 776 } catch (IllegalArgumentException e) { 777 // Expected. 778 } 779 assertTrue(res == null); 780 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 781 assertTrue(res.isEmpty() == true); 782 res = table.get(new Get(ROW_BYTES)); 783 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 784 } 785 } 786 787 @Test 788 public void testConnectionDefaultUsesCodec() throws Exception { 789 ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); 790 assertTrue(con.hasCellBlockSupport()); 791 } 792 793 @Test 794 public void testPutWithPreBatchMutate() throws Exception { 795 testPreBatchMutate(tableName, () -> { 796 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 797 Put put = new Put(ROW); 798 put.addColumn(FAMILY, QUALIFIER, VALUE); 799 t.put(put); 800 } catch (IOException ex) { 801 throw new RuntimeException(ex); 802 } 803 }); 804 } 805 806 @Test 807 public void testRowMutationsWithPreBatchMutate() throws Exception { 808 testPreBatchMutate(tableName, () -> { 809 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 810 RowMutations rm = new RowMutations(ROW, 1); 811 Put put = new Put(ROW); 812 put.addColumn(FAMILY, QUALIFIER, VALUE); 813 rm.add(put); 814 t.mutateRow(rm); 815 } catch (IOException ex) { 816 throw new RuntimeException(ex); 817 } 818 }); 819 } 820 821 private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { 822 HTableDescriptor desc = new HTableDescriptor(tableName); 823 desc.addCoprocessor(WaitingForScanObserver.class.getName()); 824 desc.addFamily(new HColumnDescriptor(FAMILY)); 825 TEST_UTIL.getAdmin().createTable(desc); 826 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 827 828 ExecutorService service = Executors.newFixedThreadPool(2); 829 service.execute(rn); 830 final List<Cell> cells = new ArrayList<>(); 831 service.execute(() -> { 832 try { 833 // waiting for update. 834 TimeUnit.SECONDS.sleep(3); 835 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 836 Scan scan = new Scan(); 837 try (ResultScanner scanner = t.getScanner(scan)) { 838 for (Result r : scanner) { 839 cells.addAll(Arrays.asList(r.rawCells())); 840 } 841 } 842 } 843 } catch (IOException | InterruptedException ex) { 844 throw new RuntimeException(ex); 845 } 846 }); 847 service.shutdown(); 848 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 849 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 850 + ", so the data is invisible to reader", 0, cells.size()); 851 TEST_UTIL.deleteTable(tableName); 852 } 853 854 @Test 855 public void testLockLeakWithDelta() throws Exception, Throwable { 856 HTableDescriptor desc = new HTableDescriptor(tableName); 857 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 858 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 859 desc.addFamily(new HColumnDescriptor(FAMILY)); 860 TEST_UTIL.getAdmin().createTable(desc); 861 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 862 863 // new a connection for lower retry number. 864 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 865 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 866 try (Connection con = ConnectionFactory.createConnection(copy)) { 867 HRegion region = (HRegion) find(tableName); 868 region.setTimeoutForWriteLock(10); 869 ExecutorService putService = Executors.newSingleThreadExecutor(); 870 putService.execute(() -> { 871 try (Table table = con.getTable(tableName)) { 872 Put put = new Put(ROW); 873 put.addColumn(FAMILY, QUALIFIER, VALUE); 874 // the put will be blocked by WaitingForMultiMutationsObserver. 875 table.put(put); 876 } catch (IOException ex) { 877 throw new RuntimeException(ex); 878 } 879 }); 880 ExecutorService appendService = Executors.newSingleThreadExecutor(); 881 appendService.execute(() -> { 882 Append append = new Append(ROW); 883 append.addColumn(FAMILY, QUALIFIER, VALUE); 884 try (Table table = con.getTable(tableName)) { 885 table.append(append); 886 fail("The APPEND should fail because the target lock is blocked by previous put"); 887 } catch (Exception ex) { 888 } 889 }); 890 appendService.shutdown(); 891 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 892 WaitingForMultiMutationsObserver observer = 893 find(tableName, WaitingForMultiMutationsObserver.class); 894 observer.latch.countDown(); 895 putService.shutdown(); 896 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 897 try (Table table = con.getTable(tableName)) { 898 Result r = table.get(new Get(ROW)); 899 assertFalse(r.isEmpty()); 900 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 901 } 902 } 903 HRegion region = (HRegion) find(tableName); 904 int readLockCount = region.getReadLockCount(); 905 LOG.info("readLockCount:" + readLockCount); 906 assertEquals(0, readLockCount); 907 } 908 909 @Test 910 public void testMultiRowMutations() throws Exception, Throwable { 911 HTableDescriptor desc = new HTableDescriptor(tableName); 912 desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); 913 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 914 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 915 desc.addFamily(new HColumnDescriptor(FAMILY)); 916 TEST_UTIL.getAdmin().createTable(desc); 917 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 918 919 // new a connection for lower retry number. 920 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 921 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 922 try (Connection con = ConnectionFactory.createConnection(copy)) { 923 byte[] row = Bytes.toBytes("ROW-0"); 924 byte[] rowLocked= Bytes.toBytes("ROW-1"); 925 byte[] value0 = Bytes.toBytes("VALUE-0"); 926 byte[] value1 = Bytes.toBytes("VALUE-1"); 927 byte[] value2 = Bytes.toBytes("VALUE-2"); 928 assertNoLocks(tableName); 929 ExecutorService putService = Executors.newSingleThreadExecutor(); 930 putService.execute(() -> { 931 try (Table table = con.getTable(tableName)) { 932 Put put0 = new Put(rowLocked); 933 put0.addColumn(FAMILY, QUALIFIER, value0); 934 // the put will be blocked by WaitingForMultiMutationsObserver. 935 table.put(put0); 936 } catch (IOException ex) { 937 throw new RuntimeException(ex); 938 } 939 }); 940 ExecutorService cpService = Executors.newSingleThreadExecutor(); 941 cpService.execute(() -> { 942 boolean threw; 943 Put put1 = new Put(row); 944 Put put2 = new Put(rowLocked); 945 put1.addColumn(FAMILY, QUALIFIER, value1); 946 put2.addColumn(FAMILY, QUALIFIER, value2); 947 try (Table table = con.getTable(tableName)) { 948 MultiRowMutationProtos.MutateRowsRequest request 949 = MultiRowMutationProtos.MutateRowsRequest.newBuilder() 950 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 951 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 952 .MutationType.PUT, put1)) 953 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 954 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 955 .MutationType.PUT, put2)) 956 .build(); 957 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, 958 ROW, ROW, 959 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 960 ServerRpcController controller = new ServerRpcController(); 961 CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> 962 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 963 exe.mutateRows(controller, request, rpcCallback); 964 return rpcCallback.get(); 965 }); 966 threw = false; 967 } catch (Throwable ex) { 968 threw = true; 969 } 970 if (!threw) { 971 // Can't call fail() earlier because the catch would eat it. 972 fail("This cp should fail because the target lock is blocked by previous put"); 973 } 974 }); 975 cpService.shutdown(); 976 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 977 WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); 978 observer.latch.countDown(); 979 putService.shutdown(); 980 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 981 try (Table table = con.getTable(tableName)) { 982 Get g0 = new Get(row); 983 Get g1 = new Get(rowLocked); 984 Result r0 = table.get(g0); 985 Result r1 = table.get(g1); 986 assertTrue(r0.isEmpty()); 987 assertFalse(r1.isEmpty()); 988 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 989 } 990 assertNoLocks(tableName); 991 } 992 } 993 994 /** 995 * A test case for issue HBASE-17482 996 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped 997 * onto cells in the append thread, a countdown latch is used to ensure that happened 998 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) 999 * make the seqid/mvcc acquirement in handler thread and stamping in append thread 1000 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc. 1001 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner 1002 * with a smaller readpoint can see these data, which disobey the multi version 1003 * concurrency control rules. 1004 * This test case is to reproduce this scenario. 1005 * @throws IOException 1006 */ 1007 @Test 1008 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 1009 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 1010 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1011 //put two row first to init the scanner 1012 Put put = new Put(Bytes.toBytes("0")); 1013 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 1014 table.put(put); 1015 put = new Put(Bytes.toBytes("00")); 1016 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 1017 table.put(put); 1018 Scan scan = new Scan(); 1019 scan.setTimeRange(0, Long.MAX_VALUE); 1020 scan.setCaching(1); 1021 ResultScanner scanner = table.getScanner(scan); 1022 int rowNum = scanner.next() != null ? 1 : 0; 1023 //the started scanner shouldn't see the rows put below 1024 for (int i = 1; i < 1000; i++) { 1025 put = new Put(Bytes.toBytes(String.valueOf(i))); 1026 put.setDurability(Durability.ASYNC_WAL); 1027 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 1028 table.put(put); 1029 } 1030 for (Result result : scanner) { 1031 rowNum++; 1032 } 1033 //scanner should only see two rows 1034 assertEquals(2, rowNum); 1035 scanner = table.getScanner(scan); 1036 rowNum = 0; 1037 for (Result result : scanner) { 1038 rowNum++; 1039 } 1040 // the new scanner should see all rows 1041 assertEquals(1001, rowNum); 1042 } 1043 } 1044 1045 @Test 1046 public void testPutThenGetWithMultipleThreads() throws Exception { 1047 final int THREAD_NUM = 20; 1048 final int ROUND_NUM = 10; 1049 for (int round = 0; round < ROUND_NUM; round++) { 1050 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 1051 final AtomicInteger successCnt = new AtomicInteger(0); 1052 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 1053 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1054 1055 for (int i = 0; i < THREAD_NUM; i++) { 1056 final int index = i; 1057 Thread t = new Thread(new Runnable() { 1058 1059 @Override 1060 public void run() { 1061 final byte[] row = Bytes.toBytes("row-" + index); 1062 final byte[] value = Bytes.toBytes("v" + index); 1063 try { 1064 Put put = new Put(row); 1065 put.addColumn(FAMILY, QUALIFIER, value); 1066 ht.put(put); 1067 Get get = new Get(row); 1068 Result result = ht.get(get); 1069 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 1070 if (Bytes.equals(value, returnedValue)) { 1071 successCnt.getAndIncrement(); 1072 } else { 1073 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 1074 + ", returned value: " 1075 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 1076 } 1077 } catch (Throwable e) { 1078 // do nothing 1079 } 1080 } 1081 }); 1082 threads.add(t); 1083 } 1084 for (Thread t : threads) { 1085 t.start(); 1086 } 1087 for (Thread t : threads) { 1088 t.join(); 1089 } 1090 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 1091 } 1092 TEST_UTIL.deleteTable(tableName); 1093 } 1094 } 1095 1096 private static void assertNoLocks(final TableName tableName) 1097 throws IOException, InterruptedException { 1098 HRegion region = (HRegion) find(tableName); 1099 assertEquals(0, region.getLockedRows().size()); 1100 } 1101 private static HRegion find(final TableName tableName) 1102 throws IOException, InterruptedException { 1103 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 1104 List<HRegion> regions = rs.getRegions(tableName); 1105 assertEquals(1, regions.size()); 1106 return regions.get(0); 1107 } 1108 1109 private static <T extends RegionObserver> T find(final TableName tableName, 1110 Class<T> clz) throws IOException, InterruptedException { 1111 HRegion region = find(tableName); 1112 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1113 assertTrue("The cp instance should be " + clz.getName() 1114 + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); 1115 return clz.cast(cp); 1116 } 1117 1118 public static class WaitingForMultiMutationsObserver 1119 implements RegionCoprocessor, RegionObserver { 1120 final CountDownLatch latch = new CountDownLatch(1); 1121 1122 @Override 1123 public Optional<RegionObserver> getRegionObserver() { 1124 return Optional.of(this); 1125 } 1126 1127 @Override 1128 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1129 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1130 try { 1131 latch.await(); 1132 } catch (InterruptedException ex) { 1133 throw new IOException(ex); 1134 } 1135 } 1136 } 1137 1138 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1139 private final CountDownLatch latch = new CountDownLatch(1); 1140 1141 @Override 1142 public Optional<RegionObserver> getRegionObserver() { 1143 return Optional.of(this); 1144 } 1145 1146 @Override 1147 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1148 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1149 try { 1150 // waiting for scanner 1151 latch.await(); 1152 } catch (InterruptedException ex) { 1153 throw new IOException(ex); 1154 } 1155 } 1156 1157 @Override 1158 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 1159 final Scan scan, final RegionScanner s) throws IOException { 1160 latch.countDown(); 1161 return s; 1162 } 1163 } 1164 1165 static byte[] generateHugeValue(int size) { 1166 Random rand = ThreadLocalRandom.current(); 1167 byte[] value = new byte[size]; 1168 for (int i = 0; i < value.length; i++) { 1169 value[i] = (byte) rand.nextInt(256); 1170 } 1171 return value; 1172 } 1173 1174 @Test 1175 public void testScanWithBatchSizeReturnIncompleteCells() 1176 throws IOException, InterruptedException { 1177 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1178 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1179 .build(); 1180 try (Table table = TEST_UTIL.createTable(hd, null)) { 1181 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1182 1183 Put put = new Put(ROW); 1184 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1185 table.put(put); 1186 1187 put = new Put(ROW); 1188 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1189 table.put(put); 1190 1191 for (int i = 2; i < 5; i++) { 1192 for (int version = 0; version < 2; version++) { 1193 put = new Put(ROW); 1194 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1195 table.put(put); 1196 } 1197 } 1198 1199 Scan scan = new Scan(); 1200 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1201 .setMaxResultSize(4 * 1024 * 1024); 1202 Result result; 1203 try (ResultScanner scanner = table.getScanner(scan)) { 1204 List<Result> list = new ArrayList<>(); 1205 /* 1206 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; 1207 * The second scan rpc should return a result with 3 cells, because reach the batch limit 1208 * = 3; 1209 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1210 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1211 * into two result: 2+3 -> 3+2; 1212 */ 1213 while ((result = scanner.next()) != null) { 1214 list.add(result); 1215 } 1216 1217 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1218 Assert.assertEquals(2, list.size()); 1219 Assert.assertEquals(3, list.get(0).size()); 1220 Assert.assertEquals(2, list.get(1).size()); 1221 } 1222 1223 scan = new Scan(); 1224 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1225 .setMaxResultSize(4 * 1024 * 1024); 1226 try (ResultScanner scanner = table.getScanner(scan)) { 1227 List<Result> list = new ArrayList<>(); 1228 while ((result = scanner.next()) != null) { 1229 list.add(result); 1230 } 1231 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1232 Assert.assertEquals(3, list.size()); 1233 Assert.assertEquals(2, list.get(0).size()); 1234 Assert.assertEquals(2, list.get(1).size()); 1235 Assert.assertEquals(1, list.get(2).size()); 1236 } 1237 } 1238 } 1239}