001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Objects; 031import java.util.Random; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HTableDescriptor; 047import org.apache.hadoop.hbase.MultithreadedTestUtil; 048import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.Append; 052import org.apache.hadoop.hbase.client.Delete; 053import org.apache.hadoop.hbase.client.Durability; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Increment; 056import org.apache.hadoop.hbase.client.IsolationLevel; 057import org.apache.hadoop.hbase.client.Mutation; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.RowMutations; 062import org.apache.hadoop.hbase.client.Scan; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.filter.BinaryComparator; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.wal.WAL; 071import org.junit.After; 072import org.junit.Before; 073import org.junit.ClassRule; 074import org.junit.Rule; 075import org.junit.Test; 076import org.junit.experimental.categories.Category; 077import org.junit.rules.TestName; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081/** 082 * Testing of HRegion.incrementColumnValue, HRegion.increment, 083 * and HRegion.append 084 */ 085@Category({VerySlowRegionServerTests.class, MediumTests.class}) // Starts 100 threads 086public class TestAtomicOperation { 087 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestAtomicOperation.class); 091 092 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); 093 @Rule public TestName name = new TestName(); 094 095 HRegion region = null; 096 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 097 098 // Test names 099 static byte[] tableName; 100 static final byte[] qual1 = Bytes.toBytes("qual1"); 101 static final byte[] qual2 = Bytes.toBytes("qual2"); 102 static final byte[] qual3 = Bytes.toBytes("qual3"); 103 static final byte[] value1 = Bytes.toBytes("value1"); 104 static final byte[] value2 = Bytes.toBytes("value2"); 105 static final byte [] row = Bytes.toBytes("rowA"); 106 static final byte [] row2 = Bytes.toBytes("rowB"); 107 108 @Before 109 public void setup() { 110 tableName = Bytes.toBytes(name.getMethodName()); 111 } 112 113 @After 114 public void teardown() throws IOException { 115 if (region != null) { 116 BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache(); 117 region.close(); 118 WAL wal = region.getWAL(); 119 if (wal != null) wal.close(); 120 if (bc != null) bc.shutdown(); 121 region = null; 122 } 123 } 124 ////////////////////////////////////////////////////////////////////////////// 125 // New tests that doesn't spin up a mini cluster but rather just test the 126 // individual code pieces in the HRegion. 127 ////////////////////////////////////////////////////////////////////////////// 128 129 /** 130 * Test basic append operation. 131 * More tests in 132 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() 133 */ 134 @Test 135 public void testAppend() throws IOException { 136 initHRegion(tableName, name.getMethodName(), fam1); 137 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+ 138 " The Universe, and Everything"; 139 String v2 = " is... 42."; 140 Append a = new Append(row); 141 a.setReturnResults(false); 142 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 143 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); 144 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); 145 a = new Append(row); 146 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 147 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 148 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 149 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1))); 150 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2))); 151 } 152 153 @Test 154 public void testAppendWithNonExistingFamily() throws IOException { 155 initHRegion(tableName, name.getMethodName(), fam1); 156 final String v1 = "Value"; 157 final Append a = new Append(row); 158 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 159 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 160 Result result = null; 161 try { 162 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 163 fail("Append operation should fail with NoSuchColumnFamilyException."); 164 } catch (NoSuchColumnFamilyException e) { 165 assertEquals(null, result); 166 } catch (Exception e) { 167 fail("Append operation should fail with NoSuchColumnFamilyException."); 168 } 169 } 170 171 @Test 172 public void testIncrementWithNonExistingFamily() throws IOException { 173 initHRegion(tableName, name.getMethodName(), fam1); 174 final Increment inc = new Increment(row); 175 inc.addColumn(fam1, qual1, 1); 176 inc.addColumn(fam2, qual2, 1); 177 inc.setDurability(Durability.ASYNC_WAL); 178 try { 179 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 180 } catch (NoSuchColumnFamilyException e) { 181 final Get g = new Get(row); 182 final Result result = region.get(g); 183 assertEquals(null, result.getValue(fam1, qual1)); 184 assertEquals(null, result.getValue(fam2, qual2)); 185 } catch (Exception e) { 186 fail("Increment operation should fail with NoSuchColumnFamilyException."); 187 } 188 } 189 190 /** 191 * Test multi-threaded increments. 192 */ 193 @Test 194 public void testIncrementMultiThreads() throws IOException { 195 boolean fast = true; 196 LOG.info("Starting test testIncrementMultiThreads"); 197 // run a with mixed column families (1 and 3 versions) 198 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); 199 200 // Create 100 threads, each will increment by its own quantity. All 100 threads update the 201 // same row over two column families. 202 int numThreads = 100; 203 int incrementsPerThread = 1000; 204 Incrementer[] all = new Incrementer[numThreads]; 205 int expectedTotal = 0; 206 // create all threads 207 for (int i = 0; i < numThreads; i++) { 208 all[i] = new Incrementer(region, i, i, incrementsPerThread); 209 expectedTotal += (i * incrementsPerThread); 210 } 211 212 // run all threads 213 for (int i = 0; i < numThreads; i++) { 214 all[i].start(); 215 } 216 217 // wait for all threads to finish 218 for (int i = 0; i < numThreads; i++) { 219 try { 220 all[i].join(); 221 } catch (InterruptedException e) { 222 LOG.info("Ignored", e); 223 } 224 } 225 assertICV(row, fam1, qual1, expectedTotal, fast); 226 assertICV(row, fam1, qual2, expectedTotal*2, fast); 227 assertICV(row, fam2, qual3, expectedTotal*3, fast); 228 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); 229 } 230 231 232 private void assertICV(byte [] row, 233 byte [] familiy, 234 byte[] qualifier, 235 long amount, 236 boolean fast) throws IOException { 237 // run a get and see? 238 Get get = new Get(row); 239 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 240 get.addColumn(familiy, qualifier); 241 Result result = region.get(get); 242 assertEquals(1, result.size()); 243 244 Cell kv = result.rawCells()[0]; 245 long r = Bytes.toLong(CellUtil.cloneValue(kv)); 246 assertEquals(amount, r); 247 } 248 249 private void initHRegion (byte [] tableName, String callingMethod, 250 byte[] ... families) 251 throws IOException { 252 initHRegion(tableName, callingMethod, null, families); 253 } 254 255 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, 256 byte[] ... families) 257 throws IOException { 258 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 259 int i=0; 260 for(byte [] family : families) { 261 HColumnDescriptor hcd = new HColumnDescriptor(family); 262 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); 263 htd.addFamily(hcd); 264 } 265 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 266 region = TEST_UTIL.createLocalHRegion(info, htd); 267 } 268 269 /** 270 * A thread that makes increment calls always on the same row, this.row against two column 271 * families on this row. 272 */ 273 public static class Incrementer extends Thread { 274 275 private final Region region; 276 private final int numIncrements; 277 private final int amount; 278 279 280 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { 281 super("Incrementer." + threadNumber); 282 this.region = region; 283 this.numIncrements = numIncrements; 284 this.amount = amount; 285 setDaemon(true); 286 } 287 288 @Override 289 public void run() { 290 for (int i = 0; i < numIncrements; i++) { 291 try { 292 Increment inc = new Increment(row); 293 inc.addColumn(fam1, qual1, amount); 294 inc.addColumn(fam1, qual2, amount*2); 295 inc.addColumn(fam2, qual3, amount*3); 296 inc.setDurability(Durability.ASYNC_WAL); 297 Result result = region.increment(inc); 298 if (result != null) { 299 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, 300 Bytes.toLong(result.getValue(fam1, qual2))); 301 assertTrue(result.getValue(fam2, qual3) != null); 302 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, 303 Bytes.toLong(result.getValue(fam2, qual3))); 304 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, 305 Bytes.toLong(result.getValue(fam1, qual2))); 306 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; 307 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); 308 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, 309 fam1Increment, fam2Increment); 310 } 311 } catch (IOException e) { 312 e.printStackTrace(); 313 } 314 } 315 } 316 } 317 318 @Test 319 public void testAppendMultiThreads() throws IOException { 320 LOG.info("Starting test testAppendMultiThreads"); 321 // run a with mixed column families (1 and 3 versions) 322 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); 323 324 int numThreads = 100; 325 int opsPerThread = 100; 326 AtomicOperation[] all = new AtomicOperation[numThreads]; 327 final byte[] val = new byte[]{1}; 328 329 AtomicInteger failures = new AtomicInteger(0); 330 // create all threads 331 for (int i = 0; i < numThreads; i++) { 332 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { 333 @Override 334 public void run() { 335 for (int i=0; i<numOps; i++) { 336 try { 337 Append a = new Append(row); 338 a.addColumn(fam1, qual1, val); 339 a.addColumn(fam1, qual2, val); 340 a.addColumn(fam2, qual3, val); 341 a.setDurability(Durability.ASYNC_WAL); 342 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 343 344 Get g = new Get(row); 345 Result result = region.get(g); 346 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); 347 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); 348 } catch (IOException e) { 349 e.printStackTrace(); 350 failures.incrementAndGet(); 351 fail(); 352 } 353 } 354 } 355 }; 356 } 357 358 // run all threads 359 for (int i = 0; i < numThreads; i++) { 360 all[i].start(); 361 } 362 363 // wait for all threads to finish 364 for (int i = 0; i < numThreads; i++) { 365 try { 366 all[i].join(); 367 } catch (InterruptedException e) { 368 } 369 } 370 assertEquals(0, failures.get()); 371 Get g = new Get(row); 372 Result result = region.get(g); 373 assertEquals(10000, result.getValue(fam1, qual1).length); 374 assertEquals(10000, result.getValue(fam1, qual2).length); 375 assertEquals(10000, result.getValue(fam2, qual3).length); 376 } 377 /** 378 * Test multi-threaded row mutations. 379 */ 380 @Test 381 public void testRowMutationMultiThreads() throws IOException { 382 LOG.info("Starting test testRowMutationMultiThreads"); 383 initHRegion(tableName, name.getMethodName(), fam1); 384 385 // create 10 threads, each will alternate between adding and 386 // removing a column 387 int numThreads = 10; 388 int opsPerThread = 250; 389 AtomicOperation[] all = new AtomicOperation[numThreads]; 390 391 AtomicLong timeStamps = new AtomicLong(0); 392 AtomicInteger failures = new AtomicInteger(0); 393 // create all threads 394 for (int i = 0; i < numThreads; i++) { 395 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 396 @Override 397 public void run() { 398 boolean op = true; 399 for (int i=0; i<numOps; i++) { 400 try { 401 // throw in some flushes 402 if (i%10==0) { 403 synchronized(region) { 404 LOG.debug("flushing"); 405 region.flush(true); 406 if (i%100==0) { 407 region.compact(false); 408 } 409 } 410 } 411 long ts = timeStamps.incrementAndGet(); 412 RowMutations rm = new RowMutations(row); 413 if (op) { 414 Put p = new Put(row, ts); 415 p.addColumn(fam1, qual1, value1); 416 p.setDurability(Durability.ASYNC_WAL); 417 rm.add(p); 418 Delete d = new Delete(row); 419 d.addColumns(fam1, qual2, ts); 420 d.setDurability(Durability.ASYNC_WAL); 421 rm.add(d); 422 } else { 423 Delete d = new Delete(row); 424 d.addColumns(fam1, qual1, ts); 425 d.setDurability(Durability.ASYNC_WAL); 426 rm.add(d); 427 Put p = new Put(row, ts); 428 p.addColumn(fam1, qual2, value2); 429 p.setDurability(Durability.ASYNC_WAL); 430 rm.add(p); 431 } 432 region.mutateRow(rm); 433 op ^= true; 434 // check: should always see exactly one column 435 Get g = new Get(row); 436 Result r = region.get(g); 437 if (r.size() != 1) { 438 LOG.debug(Objects.toString(r)); 439 failures.incrementAndGet(); 440 fail(); 441 } 442 } catch (IOException e) { 443 e.printStackTrace(); 444 failures.incrementAndGet(); 445 fail(); 446 } 447 } 448 } 449 }; 450 } 451 452 // run all threads 453 for (int i = 0; i < numThreads; i++) { 454 all[i].start(); 455 } 456 457 // wait for all threads to finish 458 for (int i = 0; i < numThreads; i++) { 459 try { 460 all[i].join(); 461 } catch (InterruptedException e) { 462 } 463 } 464 assertEquals(0, failures.get()); 465 } 466 467 468 /** 469 * Test multi-threaded region mutations. 470 */ 471 @Test 472 public void testMultiRowMutationMultiThreads() throws IOException { 473 474 LOG.info("Starting test testMultiRowMutationMultiThreads"); 475 initHRegion(tableName, name.getMethodName(), fam1); 476 477 // create 10 threads, each will alternate between adding and 478 // removing a column 479 int numThreads = 10; 480 int opsPerThread = 250; 481 AtomicOperation[] all = new AtomicOperation[numThreads]; 482 483 AtomicLong timeStamps = new AtomicLong(0); 484 AtomicInteger failures = new AtomicInteger(0); 485 final List<byte[]> rowsToLock = Arrays.asList(row, row2); 486 // create all threads 487 for (int i = 0; i < numThreads; i++) { 488 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 489 @Override 490 public void run() { 491 boolean op = true; 492 for (int i=0; i<numOps; i++) { 493 try { 494 // throw in some flushes 495 if (i%10==0) { 496 synchronized(region) { 497 LOG.debug("flushing"); 498 region.flush(true); 499 if (i%100==0) { 500 region.compact(false); 501 } 502 } 503 } 504 long ts = timeStamps.incrementAndGet(); 505 List<Mutation> mrm = new ArrayList<>(); 506 if (op) { 507 Put p = new Put(row2, ts); 508 p.addColumn(fam1, qual1, value1); 509 p.setDurability(Durability.ASYNC_WAL); 510 mrm.add(p); 511 Delete d = new Delete(row); 512 d.addColumns(fam1, qual1, ts); 513 d.setDurability(Durability.ASYNC_WAL); 514 mrm.add(d); 515 } else { 516 Delete d = new Delete(row2); 517 d.addColumns(fam1, qual1, ts); 518 d.setDurability(Durability.ASYNC_WAL); 519 mrm.add(d); 520 Put p = new Put(row, ts); 521 p.setDurability(Durability.ASYNC_WAL); 522 p.addColumn(fam1, qual1, value2); 523 mrm.add(p); 524 } 525 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); 526 op ^= true; 527 // check: should always see exactly one column 528 Scan s = new Scan(row); 529 RegionScanner rs = region.getScanner(s); 530 List<Cell> r = new ArrayList<>(); 531 while (rs.next(r)) 532 ; 533 rs.close(); 534 if (r.size() != 1) { 535 LOG.debug(Objects.toString(r)); 536 failures.incrementAndGet(); 537 fail(); 538 } 539 } catch (IOException e) { 540 e.printStackTrace(); 541 failures.incrementAndGet(); 542 fail(); 543 } 544 } 545 } 546 }; 547 } 548 549 // run all threads 550 for (int i = 0; i < numThreads; i++) { 551 all[i].start(); 552 } 553 554 // wait for all threads to finish 555 for (int i = 0; i < numThreads; i++) { 556 try { 557 all[i].join(); 558 } catch (InterruptedException e) { 559 } 560 } 561 assertEquals(0, failures.get()); 562 } 563 564 public static class AtomicOperation extends Thread { 565 protected final HRegion region; 566 protected final int numOps; 567 protected final AtomicLong timeStamps; 568 protected final AtomicInteger failures; 569 protected final Random r = new Random(); 570 571 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 572 AtomicInteger failures) { 573 this.region = region; 574 this.numOps = numOps; 575 this.timeStamps = timeStamps; 576 this.failures = failures; 577 } 578 } 579 580 private static CountDownLatch latch = new CountDownLatch(1); 581 private enum TestStep { 582 INIT, // initial put of 10 to set value of the cell 583 PUT_STARTED, // began doing a put of 50 to cell 584 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). 585 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 586 CHECKANDPUT_COMPLETED // completed checkAndPut 587 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! 588 } 589 private static volatile TestStep testStep = TestStep.INIT; 590 private final String family = "f1"; 591 592 /** 593 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read 594 * MVCC. 595 * 596 * Moved into TestAtomicOperation from its original location, TestHBase7051 597 */ 598 @Test 599 public void testPutAndCheckAndPutInParallel() throws Exception { 600 Configuration conf = TEST_UTIL.getConfiguration(); 601 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); 602 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())) 603 .addFamily(new HColumnDescriptor(family)); 604 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 605 Put[] puts = new Put[1]; 606 Put put = new Put(Bytes.toBytes("r1")); 607 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); 608 puts[0] = put; 609 610 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE); 611 MultithreadedTestUtil.TestContext ctx = 612 new MultithreadedTestUtil.TestContext(conf); 613 ctx.addThread(new PutThread(ctx, region)); 614 ctx.addThread(new CheckAndPutThread(ctx, region)); 615 ctx.startThreads(); 616 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { 617 Thread.sleep(100); 618 } 619 ctx.stop(); 620 Scan s = new Scan(); 621 RegionScanner scanner = region.getScanner(s); 622 List<Cell> results = new ArrayList<>(); 623 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); 624 scanner.next(results, scannerContext); 625 for (Cell keyValue : results) { 626 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); 627 } 628 } 629 630 private class PutThread extends TestThread { 631 private Region region; 632 PutThread(TestContext ctx, Region region) { 633 super(ctx); 634 this.region = region; 635 } 636 637 @Override 638 public void doWork() throws Exception { 639 Put[] puts = new Put[1]; 640 Put put = new Put(Bytes.toBytes("r1")); 641 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); 642 puts[0] = put; 643 testStep = TestStep.PUT_STARTED; 644 region.batchMutate(puts); 645 } 646 } 647 648 private class CheckAndPutThread extends TestThread { 649 private Region region; 650 CheckAndPutThread(TestContext ctx, Region region) { 651 super(ctx); 652 this.region = region; 653 } 654 655 @Override 656 public void doWork() throws Exception { 657 Put[] puts = new Put[1]; 658 Put put = new Put(Bytes.toBytes("r1")); 659 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); 660 puts[0] = put; 661 while (testStep != TestStep.PUT_COMPLETED) { 662 Thread.sleep(100); 663 } 664 testStep = TestStep.CHECKANDPUT_STARTED; 665 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), 666 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); 667 testStep = TestStep.CHECKANDPUT_COMPLETED; 668 } 669 } 670 671 public static class MockHRegion extends HRegion { 672 673 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, 674 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { 675 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); 676 } 677 678 @Override 679 public RowLock getRowLockInternal(final byte[] row, boolean readLock, 680 final RowLock prevRowlock) throws IOException { 681 if (testStep == TestStep.CHECKANDPUT_STARTED) { 682 latch.countDown(); 683 } 684 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); 685 } 686 687 public class WrappedRowLock implements RowLock { 688 689 private final RowLock rowLock; 690 691 private WrappedRowLock(RowLock rowLock) { 692 this.rowLock = rowLock; 693 } 694 695 696 @Override 697 public void release() { 698 if (testStep == TestStep.INIT) { 699 this.rowLock.release(); 700 return; 701 } 702 703 if (testStep == TestStep.PUT_STARTED) { 704 try { 705 testStep = TestStep.PUT_COMPLETED; 706 this.rowLock.release(); 707 // put has been written to the memstore and the row lock has been released, but the 708 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of 709 // operations would cause the non-atomicity to show up: 710 // 1) Put releases row lock (where we are now) 711 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) 712 // because the MVCC has not advanced 713 // 3) Put advances MVCC 714 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock 715 // (see below), and then wait some more to give the checkAndPut time to read the old 716 // value. 717 latch.await(); 718 Thread.sleep(1000); 719 } catch (InterruptedException e) { 720 Thread.currentThread().interrupt(); 721 } 722 } 723 else if (testStep == TestStep.CHECKANDPUT_STARTED) { 724 this.rowLock.release(); 725 } 726 } 727 } 728 } 729}