001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.MultithreadedTestUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Append; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Delete; 051import org.apache.hadoop.hbase.client.Durability; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Increment; 054import org.apache.hadoop.hbase.client.IsolationLevel; 055import org.apache.hadoop.hbase.client.Mutation; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.RegionInfoBuilder; 059import org.apache.hadoop.hbase.client.Result; 060import org.apache.hadoop.hbase.client.RowMutations; 061import org.apache.hadoop.hbase.client.Scan; 062import org.apache.hadoop.hbase.client.TableDescriptor; 063import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 064import org.apache.hadoop.hbase.filter.BinaryComparator; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.junit.jupiter.api.AfterEach; 073import org.junit.jupiter.api.BeforeEach; 074import org.junit.jupiter.api.Tag; 075import org.junit.jupiter.api.Test; 076import org.junit.jupiter.api.TestInfo; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080/** 081 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append 082 */ 083@Tag(VerySlowRegionServerTests.TAG) 084@Tag(LargeTests.TAG) // Starts 100 threads 085public class TestAtomicOperation { 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); 088 private String name; 089 090 HRegion region = null; 091 private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 092 093 // Test names 094 static byte[] tableName; 095 static final byte[] qual1 = Bytes.toBytes("qual1"); 096 static final byte[] qual2 = Bytes.toBytes("qual2"); 097 static final byte[] qual3 = Bytes.toBytes("qual3"); 098 static final byte[] value1 = Bytes.toBytes("value1"); 099 static final byte[] value2 = Bytes.toBytes("value2"); 100 static final byte[] row = Bytes.toBytes("rowA"); 101 static final byte[] row2 = Bytes.toBytes("rowB"); 102 103 @BeforeEach 104 public void setup(TestInfo testInfo) { 105 this.name = testInfo.getTestMethod().get().getName(); 106 tableName = Bytes.toBytes(name); 107 } 108 109 @AfterEach 110 public void teardown() throws IOException { 111 if (region != null) { 112 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig(); 113 region.close(); 114 WAL wal = region.getWAL(); 115 if (wal != null) { 116 wal.close(); 117 } 118 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown); 119 region = null; 120 } 121 } 122 123 ////////////////////////////////////////////////////////////////////////////// 124 // New tests that doesn't spin up a mini cluster but rather just test the 125 // individual code pieces in the HRegion. 126 ////////////////////////////////////////////////////////////////////////////// 127 128 /** 129 * Test basic append operation. More tests in 130 * {@link org.apache.hadoop.hbase.client.FromClientSideTest5#testAppend()}. 131 */ 132 @Test 133 public void testAppend() throws IOException { 134 initHRegion(tableName, name, fam1); 135 String v1 = 136 "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything"; 137 String v2 = " is... 42."; 138 Append a = new Append(row); 139 a.setReturnResults(false); 140 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 141 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); 142 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); 143 a = new Append(row); 144 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 145 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 146 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 147 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1))); 148 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2))); 149 } 150 151 @Test 152 public void testAppendWithMultipleFamilies() throws IOException { 153 final byte[] fam3 = Bytes.toBytes("colfamily31"); 154 initHRegion(tableName, name, fam1, fam2, fam3); 155 String v1 = "Appended"; 156 String v2 = "Value"; 157 158 Append a = new Append(row); 159 a.setReturnResults(false); 160 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 161 a.addColumn(fam2, qual2, Bytes.toBytes(v2)); 162 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 163 assertTrue(result.isEmpty(), 164 "Expected an empty result but result contains " + result.size() + " keys"); 165 166 a = new Append(row); 167 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 168 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 169 a.addColumn(fam3, qual3, Bytes.toBytes(v2)); 170 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 171 172 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 173 174 byte[] actualValue1 = result.getValue(fam1, qual1); 175 byte[] actualValue2 = result.getValue(fam2, qual2); 176 byte[] actualValue3 = result.getValue(fam3, qual3); 177 byte[] actualValue4 = result.getValue(fam1, qual2); 178 179 assertNotNull(actualValue1, "Value1 should bot be null"); 180 assertNotNull(actualValue2, "Value2 should bot be null"); 181 assertNotNull(actualValue3, "Value3 should bot be null"); 182 assertNotNull(actualValue4, "Value4 should bot be null"); 183 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1)); 184 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2)); 185 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3)); 186 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4)); 187 } 188 189 @Test 190 public void testAppendWithNonExistingFamily() throws IOException { 191 initHRegion(tableName, name, fam1); 192 final String v1 = "Value"; 193 final Append a = new Append(row); 194 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 195 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 196 Result result = null; 197 try { 198 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 199 fail("Append operation should fail with NoSuchColumnFamilyException."); 200 } catch (NoSuchColumnFamilyException e) { 201 assertEquals(null, result); 202 } catch (Exception e) { 203 fail("Append operation should fail with NoSuchColumnFamilyException."); 204 } 205 } 206 207 @Test 208 public void testIncrementWithNonExistingFamily() throws IOException { 209 initHRegion(tableName, name, fam1); 210 final Increment inc = new Increment(row); 211 inc.addColumn(fam1, qual1, 1); 212 inc.addColumn(fam2, qual2, 1); 213 inc.setDurability(Durability.ASYNC_WAL); 214 try { 215 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 216 } catch (NoSuchColumnFamilyException e) { 217 final Get g = new Get(row); 218 final Result result = region.get(g); 219 assertEquals(null, result.getValue(fam1, qual1)); 220 assertEquals(null, result.getValue(fam2, qual2)); 221 } catch (Exception e) { 222 fail("Increment operation should fail with NoSuchColumnFamilyException."); 223 } 224 } 225 226 /** 227 * Test multi-threaded increments. 228 */ 229 @Test 230 public void testIncrementMultiThreads() throws IOException { 231 boolean fast = true; 232 LOG.info("Starting test testIncrementMultiThreads"); 233 // run a with mixed column families (1 and 3 versions) 234 initHRegion(tableName, name, new int[] { 1, 3 }, fam1, fam2); 235 236 // Create 100 threads, each will increment by its own quantity. All 100 threads update the 237 // same row over two column families. 238 int numThreads = 100; 239 int incrementsPerThread = 1000; 240 Incrementer[] all = new Incrementer[numThreads]; 241 int expectedTotal = 0; 242 // create all threads 243 for (int i = 0; i < numThreads; i++) { 244 all[i] = new Incrementer(region, i, i, incrementsPerThread); 245 expectedTotal += (i * incrementsPerThread); 246 } 247 248 // run all threads 249 for (int i = 0; i < numThreads; i++) { 250 all[i].start(); 251 } 252 253 // wait for all threads to finish 254 for (int i = 0; i < numThreads; i++) { 255 try { 256 all[i].join(); 257 } catch (InterruptedException e) { 258 LOG.info("Ignored", e); 259 } 260 } 261 assertICV(row, fam1, qual1, expectedTotal, fast); 262 assertICV(row, fam1, qual2, expectedTotal * 2, fast); 263 assertICV(row, fam2, qual3, expectedTotal * 3, fast); 264 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); 265 } 266 267 private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast) 268 throws IOException { 269 // run a get and see? 270 Get get = new Get(row); 271 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 272 get.addColumn(familiy, qualifier); 273 Result result = region.get(get); 274 assertEquals(1, result.size()); 275 276 Cell kv = result.rawCells()[0]; 277 long r = Bytes.toLong(CellUtil.cloneValue(kv)); 278 assertEquals(amount, r); 279 } 280 281 private void initHRegion(byte[] tableName, String callingMethod, byte[]... families) 282 throws IOException { 283 initHRegion(tableName, callingMethod, null, families); 284 } 285 286 private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions, 287 byte[]... families) throws IOException { 288 TableDescriptorBuilder builder = 289 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 290 291 int i = 0; 292 for (byte[] family : families) { 293 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family) 294 .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build(); 295 builder.setColumnFamily(familyDescriptor); 296 } 297 TableDescriptor tableDescriptor = builder.build(); 298 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 299 region = TEST_UTIL.createLocalHRegion(info, tableDescriptor); 300 } 301 302 /** 303 * A thread that makes increment calls always on the same row, this.row against two column 304 * families on this row. 305 */ 306 public static class Incrementer extends Thread { 307 308 private final Region region; 309 private final int numIncrements; 310 private final int amount; 311 312 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { 313 super("Incrementer." + threadNumber); 314 this.region = region; 315 this.numIncrements = numIncrements; 316 this.amount = amount; 317 setDaemon(true); 318 } 319 320 @Override 321 public void run() { 322 for (int i = 0; i < numIncrements; i++) { 323 try { 324 Increment inc = new Increment(row); 325 inc.addColumn(fam1, qual1, amount); 326 inc.addColumn(fam1, qual2, amount * 2); 327 inc.addColumn(fam2, qual3, amount * 3); 328 inc.setDurability(Durability.ASYNC_WAL); 329 Result result = region.increment(inc); 330 if (result != null) { 331 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 332 Bytes.toLong(result.getValue(fam1, qual2))); 333 assertTrue(result.getValue(fam2, qual3) != null); 334 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3, 335 Bytes.toLong(result.getValue(fam2, qual3))); 336 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 337 Bytes.toLong(result.getValue(fam1, qual2))); 338 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3; 339 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); 340 assertEquals(fam1Increment, fam2Increment, 341 "fam1=" + fam1Increment + ", fam2=" + fam2Increment); 342 } 343 } catch (IOException e) { 344 e.printStackTrace(); 345 } 346 } 347 } 348 } 349 350 @Test 351 public void testAppendMultiThreads() throws IOException { 352 LOG.info("Starting test testAppendMultiThreads"); 353 // run a with mixed column families (1 and 3 versions) 354 initHRegion(tableName, name, new int[] { 1, 3 }, fam1, fam2); 355 356 int numThreads = 100; 357 int opsPerThread = 100; 358 AtomicOperation[] all = new AtomicOperation[numThreads]; 359 final byte[] val = new byte[] { 1 }; 360 361 AtomicInteger failures = new AtomicInteger(0); 362 // create all threads 363 for (int i = 0; i < numThreads; i++) { 364 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { 365 @Override 366 public void run() { 367 for (int i = 0; i < numOps; i++) { 368 try { 369 Append a = new Append(row); 370 a.addColumn(fam1, qual1, val); 371 a.addColumn(fam1, qual2, val); 372 a.addColumn(fam2, qual3, val); 373 a.setDurability(Durability.ASYNC_WAL); 374 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 375 376 Get g = new Get(row); 377 Result result = region.get(g); 378 assertEquals(result.getValue(fam1, qual1).length, 379 result.getValue(fam1, qual2).length); 380 assertEquals(result.getValue(fam1, qual1).length, 381 result.getValue(fam2, qual3).length); 382 } catch (IOException e) { 383 e.printStackTrace(); 384 failures.incrementAndGet(); 385 fail(); 386 } 387 } 388 } 389 }; 390 } 391 392 // run all threads 393 for (int i = 0; i < numThreads; i++) { 394 all[i].start(); 395 } 396 397 // wait for all threads to finish 398 for (int i = 0; i < numThreads; i++) { 399 try { 400 all[i].join(); 401 } catch (InterruptedException e) { 402 } 403 } 404 assertEquals(0, failures.get()); 405 Get g = new Get(row); 406 Result result = region.get(g); 407 assertEquals(10000, result.getValue(fam1, qual1).length); 408 assertEquals(10000, result.getValue(fam1, qual2).length); 409 assertEquals(10000, result.getValue(fam2, qual3).length); 410 } 411 412 /** 413 * Test multi-threaded row mutations. 414 */ 415 @Test 416 public void testRowMutationMultiThreads() throws IOException { 417 LOG.info("Starting test testRowMutationMultiThreads"); 418 initHRegion(tableName, name, fam1); 419 420 // create 10 threads, each will alternate between adding and 421 // removing a column 422 int numThreads = 10; 423 int opsPerThread = 250; 424 AtomicOperation[] all = new AtomicOperation[numThreads]; 425 426 AtomicLong timeStamps = new AtomicLong(0); 427 AtomicInteger failures = new AtomicInteger(0); 428 // create all threads 429 for (int i = 0; i < numThreads; i++) { 430 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 431 @Override 432 public void run() { 433 boolean op = true; 434 for (int i = 0; i < numOps; i++) { 435 try { 436 // throw in some flushes 437 if (i % 10 == 0) { 438 synchronized (region) { 439 LOG.debug("flushing"); 440 region.flush(true); 441 if (i % 100 == 0) { 442 region.compact(false); 443 } 444 } 445 } 446 long ts = timeStamps.incrementAndGet(); 447 RowMutations rm = new RowMutations(row); 448 if (op) { 449 Put p = new Put(row, ts); 450 p.addColumn(fam1, qual1, value1); 451 p.setDurability(Durability.ASYNC_WAL); 452 rm.add(p); 453 Delete d = new Delete(row); 454 d.addColumns(fam1, qual2, ts); 455 d.setDurability(Durability.ASYNC_WAL); 456 rm.add(d); 457 } else { 458 Delete d = new Delete(row); 459 d.addColumns(fam1, qual1, ts); 460 d.setDurability(Durability.ASYNC_WAL); 461 rm.add(d); 462 Put p = new Put(row, ts); 463 p.addColumn(fam1, qual2, value2); 464 p.setDurability(Durability.ASYNC_WAL); 465 rm.add(p); 466 } 467 region.mutateRow(rm); 468 op ^= true; 469 // check: should always see exactly one column 470 Get g = new Get(row); 471 Result r = region.get(g); 472 if (r.size() != 1) { 473 LOG.debug(Objects.toString(r)); 474 failures.incrementAndGet(); 475 fail(); 476 } 477 } catch (IOException e) { 478 e.printStackTrace(); 479 failures.incrementAndGet(); 480 fail(); 481 } 482 } 483 } 484 }; 485 } 486 487 // run all threads 488 for (int i = 0; i < numThreads; i++) { 489 all[i].start(); 490 } 491 492 // wait for all threads to finish 493 for (int i = 0; i < numThreads; i++) { 494 try { 495 all[i].join(); 496 } catch (InterruptedException e) { 497 } 498 } 499 assertEquals(0, failures.get()); 500 } 501 502 /** 503 * Test multi-threaded region mutations. 504 */ 505 @Test 506 public void testMultiRowMutationMultiThreads() throws IOException { 507 508 LOG.info("Starting test testMultiRowMutationMultiThreads"); 509 initHRegion(tableName, name, fam1); 510 511 // create 10 threads, each will alternate between adding and 512 // removing a column 513 int numThreads = 10; 514 int opsPerThread = 250; 515 AtomicOperation[] all = new AtomicOperation[numThreads]; 516 517 AtomicLong timeStamps = new AtomicLong(0); 518 AtomicInteger failures = new AtomicInteger(0); 519 final List<byte[]> rowsToLock = Arrays.asList(row, row2); 520 // create all threads 521 for (int i = 0; i < numThreads; i++) { 522 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 523 @Override 524 public void run() { 525 boolean op = true; 526 for (int i = 0; i < numOps; i++) { 527 try { 528 // throw in some flushes 529 if (i % 10 == 0) { 530 synchronized (region) { 531 LOG.debug("flushing"); 532 region.flush(true); 533 if (i % 100 == 0) { 534 region.compact(false); 535 } 536 } 537 } 538 long ts = timeStamps.incrementAndGet(); 539 List<Mutation> mrm = new ArrayList<>(); 540 if (op) { 541 Put p = new Put(row2, ts); 542 p.addColumn(fam1, qual1, value1); 543 p.setDurability(Durability.ASYNC_WAL); 544 mrm.add(p); 545 Delete d = new Delete(row); 546 d.addColumns(fam1, qual1, ts); 547 d.setDurability(Durability.ASYNC_WAL); 548 mrm.add(d); 549 } else { 550 Delete d = new Delete(row2); 551 d.addColumns(fam1, qual1, ts); 552 d.setDurability(Durability.ASYNC_WAL); 553 mrm.add(d); 554 Put p = new Put(row, ts); 555 p.setDurability(Durability.ASYNC_WAL); 556 p.addColumn(fam1, qual1, value2); 557 mrm.add(p); 558 } 559 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); 560 op ^= true; 561 // check: should always see exactly one column 562 Scan s = new Scan().withStartRow(row); 563 RegionScanner rs = region.getScanner(s); 564 List<Cell> r = new ArrayList<>(); 565 while (rs.next(r)) 566 ; 567 rs.close(); 568 if (r.size() != 1) { 569 LOG.debug(Objects.toString(r)); 570 failures.incrementAndGet(); 571 fail(); 572 } 573 } catch (IOException e) { 574 e.printStackTrace(); 575 failures.incrementAndGet(); 576 fail(); 577 } 578 } 579 } 580 }; 581 } 582 583 // run all threads 584 for (int i = 0; i < numThreads; i++) { 585 all[i].start(); 586 } 587 588 // wait for all threads to finish 589 for (int i = 0; i < numThreads; i++) { 590 try { 591 all[i].join(); 592 } catch (InterruptedException e) { 593 } 594 } 595 assertEquals(0, failures.get()); 596 } 597 598 public static class AtomicOperation extends Thread { 599 protected final HRegion region; 600 protected final int numOps; 601 protected final AtomicLong timeStamps; 602 protected final AtomicInteger failures; 603 604 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 605 AtomicInteger failures) { 606 this.region = region; 607 this.numOps = numOps; 608 this.timeStamps = timeStamps; 609 this.failures = failures; 610 } 611 } 612 613 private static CountDownLatch latch = new CountDownLatch(1); 614 615 private enum TestStep { 616 INIT, // initial put of 10 to set value of the cell 617 PUT_STARTED, // began doing a put of 50 to cell 618 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). 619 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 620 CHECKANDPUT_COMPLETED // completed checkAndPut 621 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! 622 } 623 624 private static volatile TestStep testStep = TestStep.INIT; 625 private final String family = "f1"; 626 627 /** 628 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into 629 * TestAtomicOperation from its original location, TestHBase7051 630 */ 631 @Test 632 public void testPutAndCheckAndPutInParallel() throws Exception { 633 Configuration conf = TEST_UTIL.getConfiguration(); 634 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); 635 TableDescriptorBuilder tableDescriptorBuilder = 636 TableDescriptorBuilder.newBuilder(TableName.valueOf(name)); 637 ColumnFamilyDescriptor columnFamilyDescriptor = 638 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build(); 639 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 640 this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null); 641 Put[] puts = new Put[1]; 642 Put put = new Put(Bytes.toBytes("r1")); 643 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); 644 puts[0] = put; 645 646 region.batchMutate(puts); 647 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 648 ctx.addThread(new PutThread(ctx, region)); 649 ctx.addThread(new CheckAndPutThread(ctx, region)); 650 ctx.startThreads(); 651 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { 652 Thread.sleep(100); 653 } 654 ctx.stop(); 655 Scan s = new Scan(); 656 RegionScanner scanner = region.getScanner(s); 657 List<Cell> results = new ArrayList<>(); 658 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); 659 scanner.next(results, scannerContext); 660 for (Cell keyValue : results) { 661 assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue))); 662 } 663 } 664 665 private class PutThread extends TestThread { 666 private Region region; 667 668 PutThread(TestContext ctx, Region region) { 669 super(ctx); 670 this.region = region; 671 } 672 673 @Override 674 public void doWork() throws Exception { 675 Put[] puts = new Put[1]; 676 Put put = new Put(Bytes.toBytes("r1")); 677 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); 678 puts[0] = put; 679 testStep = TestStep.PUT_STARTED; 680 region.batchMutate(puts); 681 } 682 } 683 684 private class CheckAndPutThread extends TestThread { 685 private Region region; 686 687 CheckAndPutThread(TestContext ctx, Region region) { 688 super(ctx); 689 this.region = region; 690 } 691 692 @Override 693 public void doWork() throws Exception { 694 Put[] puts = new Put[1]; 695 Put put = new Put(Bytes.toBytes("r1")); 696 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); 697 puts[0] = put; 698 while (testStep != TestStep.PUT_COMPLETED) { 699 Thread.sleep(100); 700 } 701 testStep = TestStep.CHECKANDPUT_STARTED; 702 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), 703 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); 704 testStep = TestStep.CHECKANDPUT_COMPLETED; 705 } 706 } 707 708 public static class MockHRegion extends HRegion { 709 710 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, 711 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { 712 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); 713 } 714 715 @Override 716 protected RowLock getRowLockInternal(final byte[] row, boolean readLock, 717 final RowLock prevRowlock) throws IOException { 718 if (testStep == TestStep.CHECKANDPUT_STARTED) { 719 latch.countDown(); 720 } 721 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); 722 } 723 724 public class WrappedRowLock implements RowLock { 725 726 private final RowLock rowLock; 727 728 private WrappedRowLock(RowLock rowLock) { 729 this.rowLock = rowLock; 730 } 731 732 @Override 733 public void release() { 734 if (testStep == TestStep.INIT) { 735 this.rowLock.release(); 736 return; 737 } 738 739 if (testStep == TestStep.PUT_STARTED) { 740 try { 741 testStep = TestStep.PUT_COMPLETED; 742 this.rowLock.release(); 743 // put has been written to the memstore and the row lock has been released, but the 744 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of 745 // operations would cause the non-atomicity to show up: 746 // 1) Put releases row lock (where we are now) 747 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) 748 // because the MVCC has not advanced 749 // 3) Put advances MVCC 750 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock 751 // (see below), and then wait some more to give the checkAndPut time to read the old 752 // value. 753 latch.await(); 754 Thread.sleep(1000); 755 } catch (InterruptedException e) { 756 Thread.currentThread().interrupt(); 757 } 758 } else if (testStep == TestStep.CHECKANDPUT_STARTED) { 759 this.rowLock.release(); 760 } 761 } 762 } 763 } 764}