001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.Objects; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.CompareOperator; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionInfo; 047import org.apache.hadoop.hbase.HTableDescriptor; 048import org.apache.hadoop.hbase.MultithreadedTestUtil; 049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 050import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.Append; 053import org.apache.hadoop.hbase.client.Delete; 054import org.apache.hadoop.hbase.client.Durability; 055import org.apache.hadoop.hbase.client.Get; 056import org.apache.hadoop.hbase.client.Increment; 057import org.apache.hadoop.hbase.client.IsolationLevel; 058import org.apache.hadoop.hbase.client.Mutation; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.Result; 062import org.apache.hadoop.hbase.client.RowMutations; 063import org.apache.hadoop.hbase.client.Scan; 064import org.apache.hadoop.hbase.client.TableDescriptor; 065import org.apache.hadoop.hbase.filter.BinaryComparator; 066import org.apache.hadoop.hbase.io.HeapSize; 067import org.apache.hadoop.hbase.io.hfile.BlockCache; 068import org.apache.hadoop.hbase.io.hfile.CacheConfig; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.junit.After; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083/** 084 * Testing of HRegion.incrementColumnValue, HRegion.increment, 085 * and HRegion.append 086 */ 087@Category({VerySlowRegionServerTests.class, LargeTests.class}) // Starts 100 threads 088public class TestAtomicOperation { 089 090 @ClassRule 091 public static final HBaseClassTestRule CLASS_RULE = 092 HBaseClassTestRule.forClass(TestAtomicOperation.class); 093 094 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); 095 @Rule public TestName name = new TestName(); 096 097 HRegion region = null; 098 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 099 100 // Test names 101 static byte[] tableName; 102 static final byte[] qual1 = Bytes.toBytes("qual1"); 103 static final byte[] qual2 = Bytes.toBytes("qual2"); 104 static final byte[] qual3 = Bytes.toBytes("qual3"); 105 static final byte[] value1 = Bytes.toBytes("value1"); 106 static final byte[] value2 = Bytes.toBytes("value2"); 107 static final byte [] row = Bytes.toBytes("rowA"); 108 static final byte [] row2 = Bytes.toBytes("rowB"); 109 110 @Before 111 public void setup() { 112 tableName = Bytes.toBytes(name.getMethodName()); 113 } 114 115 @After 116 public void teardown() throws IOException { 117 if (region != null) { 118 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig(); 119 region.close(); 120 WAL wal = region.getWAL(); 121 if (wal != null) { 122 wal.close(); 123 } 124 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown); 125 region = null; 126 } 127 } 128 129 ////////////////////////////////////////////////////////////////////////////// 130 // New tests that doesn't spin up a mini cluster but rather just test the 131 // individual code pieces in the HRegion. 132 ////////////////////////////////////////////////////////////////////////////// 133 134 /** 135 * Test basic append operation. 136 * More tests in 137 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() 138 */ 139 @Test 140 public void testAppend() throws IOException { 141 initHRegion(tableName, name.getMethodName(), fam1); 142 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+ 143 " The Universe, and Everything"; 144 String v2 = " is... 42."; 145 Append a = new Append(row); 146 a.setReturnResults(false); 147 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 148 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); 149 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); 150 a = new Append(row); 151 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 152 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 153 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 154 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1))); 155 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2))); 156 } 157 158 @Test 159 public void testAppendWithMultipleFamilies() throws IOException { 160 final byte[] fam3 = Bytes.toBytes("colfamily31"); 161 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3); 162 String v1 = "Appended"; 163 String v2 = "Value"; 164 165 Append a = new Append(row); 166 a.setReturnResults(false); 167 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 168 a.addColumn(fam2, qual2, Bytes.toBytes(v2)); 169 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 170 assertTrue("Expected an empty result but result contains " + result.size() + " keys", 171 result.isEmpty()); 172 173 a = new Append(row); 174 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 175 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 176 a.addColumn(fam3, qual3, Bytes.toBytes(v2)); 177 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 178 179 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 180 181 byte[] actualValue1 = result.getValue(fam1, qual1); 182 byte[] actualValue2 = result.getValue(fam2, qual2); 183 byte[] actualValue3 = result.getValue(fam3, qual3); 184 byte[] actualValue4 = result.getValue(fam1, qual2); 185 186 assertNotNull("Value1 should bot be null", actualValue1); 187 assertNotNull("Value2 should bot be null", actualValue2); 188 assertNotNull("Value3 should bot be null", actualValue3); 189 assertNotNull("Value4 should bot be null", actualValue4); 190 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1)); 191 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2)); 192 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3)); 193 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4)); 194 } 195 196 @Test 197 public void testAppendWithNonExistingFamily() throws IOException { 198 initHRegion(tableName, name.getMethodName(), fam1); 199 final String v1 = "Value"; 200 final Append a = new Append(row); 201 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 202 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 203 Result result = null; 204 try { 205 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 206 fail("Append operation should fail with NoSuchColumnFamilyException."); 207 } catch (NoSuchColumnFamilyException e) { 208 assertEquals(null, result); 209 } catch (Exception e) { 210 fail("Append operation should fail with NoSuchColumnFamilyException."); 211 } 212 } 213 214 @Test 215 public void testIncrementWithNonExistingFamily() throws IOException { 216 initHRegion(tableName, name.getMethodName(), fam1); 217 final Increment inc = new Increment(row); 218 inc.addColumn(fam1, qual1, 1); 219 inc.addColumn(fam2, qual2, 1); 220 inc.setDurability(Durability.ASYNC_WAL); 221 try { 222 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 223 } catch (NoSuchColumnFamilyException e) { 224 final Get g = new Get(row); 225 final Result result = region.get(g); 226 assertEquals(null, result.getValue(fam1, qual1)); 227 assertEquals(null, result.getValue(fam2, qual2)); 228 } catch (Exception e) { 229 fail("Increment operation should fail with NoSuchColumnFamilyException."); 230 } 231 } 232 233 /** 234 * Test multi-threaded increments. 235 */ 236 @Test 237 public void testIncrementMultiThreads() throws IOException { 238 boolean fast = true; 239 LOG.info("Starting test testIncrementMultiThreads"); 240 // run a with mixed column families (1 and 3 versions) 241 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); 242 243 // Create 100 threads, each will increment by its own quantity. All 100 threads update the 244 // same row over two column families. 245 int numThreads = 100; 246 int incrementsPerThread = 1000; 247 Incrementer[] all = new Incrementer[numThreads]; 248 int expectedTotal = 0; 249 // create all threads 250 for (int i = 0; i < numThreads; i++) { 251 all[i] = new Incrementer(region, i, i, incrementsPerThread); 252 expectedTotal += (i * incrementsPerThread); 253 } 254 255 // run all threads 256 for (int i = 0; i < numThreads; i++) { 257 all[i].start(); 258 } 259 260 // wait for all threads to finish 261 for (int i = 0; i < numThreads; i++) { 262 try { 263 all[i].join(); 264 } catch (InterruptedException e) { 265 LOG.info("Ignored", e); 266 } 267 } 268 assertICV(row, fam1, qual1, expectedTotal, fast); 269 assertICV(row, fam1, qual2, expectedTotal*2, fast); 270 assertICV(row, fam2, qual3, expectedTotal*3, fast); 271 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); 272 } 273 274 275 private void assertICV(byte [] row, 276 byte [] familiy, 277 byte[] qualifier, 278 long amount, 279 boolean fast) throws IOException { 280 // run a get and see? 281 Get get = new Get(row); 282 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 283 get.addColumn(familiy, qualifier); 284 Result result = region.get(get); 285 assertEquals(1, result.size()); 286 287 Cell kv = result.rawCells()[0]; 288 long r = Bytes.toLong(CellUtil.cloneValue(kv)); 289 assertEquals(amount, r); 290 } 291 292 private void initHRegion (byte [] tableName, String callingMethod, 293 byte[] ... families) 294 throws IOException { 295 initHRegion(tableName, callingMethod, null, families); 296 } 297 298 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, 299 byte[] ... families) 300 throws IOException { 301 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 302 int i=0; 303 for(byte [] family : families) { 304 HColumnDescriptor hcd = new HColumnDescriptor(family); 305 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); 306 htd.addFamily(hcd); 307 } 308 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 309 region = TEST_UTIL.createLocalHRegion(info, htd); 310 } 311 312 /** 313 * A thread that makes increment calls always on the same row, this.row against two column 314 * families on this row. 315 */ 316 public static class Incrementer extends Thread { 317 318 private final Region region; 319 private final int numIncrements; 320 private final int amount; 321 322 323 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { 324 super("Incrementer." + threadNumber); 325 this.region = region; 326 this.numIncrements = numIncrements; 327 this.amount = amount; 328 setDaemon(true); 329 } 330 331 @Override 332 public void run() { 333 for (int i = 0; i < numIncrements; i++) { 334 try { 335 Increment inc = new Increment(row); 336 inc.addColumn(fam1, qual1, amount); 337 inc.addColumn(fam1, qual2, amount*2); 338 inc.addColumn(fam2, qual3, amount*3); 339 inc.setDurability(Durability.ASYNC_WAL); 340 Result result = region.increment(inc); 341 if (result != null) { 342 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, 343 Bytes.toLong(result.getValue(fam1, qual2))); 344 assertTrue(result.getValue(fam2, qual3) != null); 345 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, 346 Bytes.toLong(result.getValue(fam2, qual3))); 347 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, 348 Bytes.toLong(result.getValue(fam1, qual2))); 349 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; 350 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); 351 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, 352 fam1Increment, fam2Increment); 353 } 354 } catch (IOException e) { 355 e.printStackTrace(); 356 } 357 } 358 } 359 } 360 361 @Test 362 public void testAppendMultiThreads() throws IOException { 363 LOG.info("Starting test testAppendMultiThreads"); 364 // run a with mixed column families (1 and 3 versions) 365 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); 366 367 int numThreads = 100; 368 int opsPerThread = 100; 369 AtomicOperation[] all = new AtomicOperation[numThreads]; 370 final byte[] val = new byte[]{1}; 371 372 AtomicInteger failures = new AtomicInteger(0); 373 // create all threads 374 for (int i = 0; i < numThreads; i++) { 375 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { 376 @Override 377 public void run() { 378 for (int i=0; i<numOps; i++) { 379 try { 380 Append a = new Append(row); 381 a.addColumn(fam1, qual1, val); 382 a.addColumn(fam1, qual2, val); 383 a.addColumn(fam2, qual3, val); 384 a.setDurability(Durability.ASYNC_WAL); 385 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 386 387 Get g = new Get(row); 388 Result result = region.get(g); 389 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); 390 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); 391 } catch (IOException e) { 392 e.printStackTrace(); 393 failures.incrementAndGet(); 394 fail(); 395 } 396 } 397 } 398 }; 399 } 400 401 // run all threads 402 for (int i = 0; i < numThreads; i++) { 403 all[i].start(); 404 } 405 406 // wait for all threads to finish 407 for (int i = 0; i < numThreads; i++) { 408 try { 409 all[i].join(); 410 } catch (InterruptedException e) { 411 } 412 } 413 assertEquals(0, failures.get()); 414 Get g = new Get(row); 415 Result result = region.get(g); 416 assertEquals(10000, result.getValue(fam1, qual1).length); 417 assertEquals(10000, result.getValue(fam1, qual2).length); 418 assertEquals(10000, result.getValue(fam2, qual3).length); 419 } 420 /** 421 * Test multi-threaded row mutations. 422 */ 423 @Test 424 public void testRowMutationMultiThreads() throws IOException { 425 LOG.info("Starting test testRowMutationMultiThreads"); 426 initHRegion(tableName, name.getMethodName(), fam1); 427 428 // create 10 threads, each will alternate between adding and 429 // removing a column 430 int numThreads = 10; 431 int opsPerThread = 250; 432 AtomicOperation[] all = new AtomicOperation[numThreads]; 433 434 AtomicLong timeStamps = new AtomicLong(0); 435 AtomicInteger failures = new AtomicInteger(0); 436 // create all threads 437 for (int i = 0; i < numThreads; i++) { 438 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 439 @Override 440 public void run() { 441 boolean op = true; 442 for (int i=0; i<numOps; i++) { 443 try { 444 // throw in some flushes 445 if (i%10==0) { 446 synchronized(region) { 447 LOG.debug("flushing"); 448 region.flush(true); 449 if (i%100==0) { 450 region.compact(false); 451 } 452 } 453 } 454 long ts = timeStamps.incrementAndGet(); 455 RowMutations rm = new RowMutations(row); 456 if (op) { 457 Put p = new Put(row, ts); 458 p.addColumn(fam1, qual1, value1); 459 p.setDurability(Durability.ASYNC_WAL); 460 rm.add(p); 461 Delete d = new Delete(row); 462 d.addColumns(fam1, qual2, ts); 463 d.setDurability(Durability.ASYNC_WAL); 464 rm.add(d); 465 } else { 466 Delete d = new Delete(row); 467 d.addColumns(fam1, qual1, ts); 468 d.setDurability(Durability.ASYNC_WAL); 469 rm.add(d); 470 Put p = new Put(row, ts); 471 p.addColumn(fam1, qual2, value2); 472 p.setDurability(Durability.ASYNC_WAL); 473 rm.add(p); 474 } 475 region.mutateRow(rm); 476 op ^= true; 477 // check: should always see exactly one column 478 Get g = new Get(row); 479 Result r = region.get(g); 480 if (r.size() != 1) { 481 LOG.debug(Objects.toString(r)); 482 failures.incrementAndGet(); 483 fail(); 484 } 485 } catch (IOException e) { 486 e.printStackTrace(); 487 failures.incrementAndGet(); 488 fail(); 489 } 490 } 491 } 492 }; 493 } 494 495 // run all threads 496 for (int i = 0; i < numThreads; i++) { 497 all[i].start(); 498 } 499 500 // wait for all threads to finish 501 for (int i = 0; i < numThreads; i++) { 502 try { 503 all[i].join(); 504 } catch (InterruptedException e) { 505 } 506 } 507 assertEquals(0, failures.get()); 508 } 509 510 511 /** 512 * Test multi-threaded region mutations. 513 */ 514 @Test 515 public void testMultiRowMutationMultiThreads() throws IOException { 516 517 LOG.info("Starting test testMultiRowMutationMultiThreads"); 518 initHRegion(tableName, name.getMethodName(), fam1); 519 520 // create 10 threads, each will alternate between adding and 521 // removing a column 522 int numThreads = 10; 523 int opsPerThread = 250; 524 AtomicOperation[] all = new AtomicOperation[numThreads]; 525 526 AtomicLong timeStamps = new AtomicLong(0); 527 AtomicInteger failures = new AtomicInteger(0); 528 final List<byte[]> rowsToLock = Arrays.asList(row, row2); 529 // create all threads 530 for (int i = 0; i < numThreads; i++) { 531 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 532 @Override 533 public void run() { 534 boolean op = true; 535 for (int i=0; i<numOps; i++) { 536 try { 537 // throw in some flushes 538 if (i%10==0) { 539 synchronized(region) { 540 LOG.debug("flushing"); 541 region.flush(true); 542 if (i%100==0) { 543 region.compact(false); 544 } 545 } 546 } 547 long ts = timeStamps.incrementAndGet(); 548 List<Mutation> mrm = new ArrayList<>(); 549 if (op) { 550 Put p = new Put(row2, ts); 551 p.addColumn(fam1, qual1, value1); 552 p.setDurability(Durability.ASYNC_WAL); 553 mrm.add(p); 554 Delete d = new Delete(row); 555 d.addColumns(fam1, qual1, ts); 556 d.setDurability(Durability.ASYNC_WAL); 557 mrm.add(d); 558 } else { 559 Delete d = new Delete(row2); 560 d.addColumns(fam1, qual1, ts); 561 d.setDurability(Durability.ASYNC_WAL); 562 mrm.add(d); 563 Put p = new Put(row, ts); 564 p.setDurability(Durability.ASYNC_WAL); 565 p.addColumn(fam1, qual1, value2); 566 mrm.add(p); 567 } 568 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); 569 op ^= true; 570 // check: should always see exactly one column 571 Scan s = new Scan(row); 572 RegionScanner rs = region.getScanner(s); 573 List<Cell> r = new ArrayList<>(); 574 while (rs.next(r)) 575 ; 576 rs.close(); 577 if (r.size() != 1) { 578 LOG.debug(Objects.toString(r)); 579 failures.incrementAndGet(); 580 fail(); 581 } 582 } catch (IOException e) { 583 e.printStackTrace(); 584 failures.incrementAndGet(); 585 fail(); 586 } 587 } 588 } 589 }; 590 } 591 592 // run all threads 593 for (int i = 0; i < numThreads; i++) { 594 all[i].start(); 595 } 596 597 // wait for all threads to finish 598 for (int i = 0; i < numThreads; i++) { 599 try { 600 all[i].join(); 601 } catch (InterruptedException e) { 602 } 603 } 604 assertEquals(0, failures.get()); 605 } 606 607 public static class AtomicOperation extends Thread { 608 protected final HRegion region; 609 protected final int numOps; 610 protected final AtomicLong timeStamps; 611 protected final AtomicInteger failures; 612 protected final Random r = new Random(); 613 614 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 615 AtomicInteger failures) { 616 this.region = region; 617 this.numOps = numOps; 618 this.timeStamps = timeStamps; 619 this.failures = failures; 620 } 621 } 622 623 private static CountDownLatch latch = new CountDownLatch(1); 624 private enum TestStep { 625 INIT, // initial put of 10 to set value of the cell 626 PUT_STARTED, // began doing a put of 50 to cell 627 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). 628 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 629 CHECKANDPUT_COMPLETED // completed checkAndPut 630 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! 631 } 632 private static volatile TestStep testStep = TestStep.INIT; 633 private final String family = "f1"; 634 635 /** 636 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read 637 * MVCC. 638 * 639 * Moved into TestAtomicOperation from its original location, TestHBase7051 640 */ 641 @Test 642 public void testPutAndCheckAndPutInParallel() throws Exception { 643 Configuration conf = TEST_UTIL.getConfiguration(); 644 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); 645 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())) 646 .addFamily(new HColumnDescriptor(family)); 647 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 648 Put[] puts = new Put[1]; 649 Put put = new Put(Bytes.toBytes("r1")); 650 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); 651 puts[0] = put; 652 653 region.batchMutate(puts); 654 MultithreadedTestUtil.TestContext ctx = 655 new MultithreadedTestUtil.TestContext(conf); 656 ctx.addThread(new PutThread(ctx, region)); 657 ctx.addThread(new CheckAndPutThread(ctx, region)); 658 ctx.startThreads(); 659 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { 660 Thread.sleep(100); 661 } 662 ctx.stop(); 663 Scan s = new Scan(); 664 RegionScanner scanner = region.getScanner(s); 665 List<Cell> results = new ArrayList<>(); 666 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); 667 scanner.next(results, scannerContext); 668 for (Cell keyValue : results) { 669 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); 670 } 671 } 672 673 private class PutThread extends TestThread { 674 private Region region; 675 PutThread(TestContext ctx, Region region) { 676 super(ctx); 677 this.region = region; 678 } 679 680 @Override 681 public void doWork() throws Exception { 682 Put[] puts = new Put[1]; 683 Put put = new Put(Bytes.toBytes("r1")); 684 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); 685 puts[0] = put; 686 testStep = TestStep.PUT_STARTED; 687 region.batchMutate(puts); 688 } 689 } 690 691 private class CheckAndPutThread extends TestThread { 692 private Region region; 693 CheckAndPutThread(TestContext ctx, Region region) { 694 super(ctx); 695 this.region = region; 696 } 697 698 @Override 699 public void doWork() throws Exception { 700 Put[] puts = new Put[1]; 701 Put put = new Put(Bytes.toBytes("r1")); 702 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); 703 puts[0] = put; 704 while (testStep != TestStep.PUT_COMPLETED) { 705 Thread.sleep(100); 706 } 707 testStep = TestStep.CHECKANDPUT_STARTED; 708 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), 709 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); 710 testStep = TestStep.CHECKANDPUT_COMPLETED; 711 } 712 } 713 714 public static class MockHRegion extends HRegion { 715 716 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, 717 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { 718 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); 719 } 720 721 @Override 722 public RowLock getRowLockInternal(final byte[] row, boolean readLock, 723 final RowLock prevRowlock) throws IOException { 724 if (testStep == TestStep.CHECKANDPUT_STARTED) { 725 latch.countDown(); 726 } 727 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); 728 } 729 730 public class WrappedRowLock implements RowLock { 731 732 private final RowLock rowLock; 733 734 private WrappedRowLock(RowLock rowLock) { 735 this.rowLock = rowLock; 736 } 737 738 739 @Override 740 public void release() { 741 if (testStep == TestStep.INIT) { 742 this.rowLock.release(); 743 return; 744 } 745 746 if (testStep == TestStep.PUT_STARTED) { 747 try { 748 testStep = TestStep.PUT_COMPLETED; 749 this.rowLock.release(); 750 // put has been written to the memstore and the row lock has been released, but the 751 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of 752 // operations would cause the non-atomicity to show up: 753 // 1) Put releases row lock (where we are now) 754 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) 755 // because the MVCC has not advanced 756 // 3) Put advances MVCC 757 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock 758 // (see below), and then wait some more to give the checkAndPut time to read the old 759 // value. 760 latch.await(); 761 Thread.sleep(1000); 762 } catch (InterruptedException e) { 763 Thread.currentThread().interrupt(); 764 } 765 } 766 else if (testStep == TestStep.CHECKANDPUT_STARTED) { 767 this.rowLock.release(); 768 } 769 } 770 } 771 } 772}