001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HTableDescriptor; 047import org.apache.hadoop.hbase.MultithreadedTestUtil; 048import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.Append; 052import org.apache.hadoop.hbase.client.Delete; 053import org.apache.hadoop.hbase.client.Durability; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Increment; 056import org.apache.hadoop.hbase.client.IsolationLevel; 057import org.apache.hadoop.hbase.client.Mutation; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.RowMutations; 062import org.apache.hadoop.hbase.client.Scan; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.filter.BinaryComparator; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.junit.After; 073import org.junit.Before; 074import org.junit.ClassRule; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.junit.rules.TestName; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082/** 083 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append 084 */ 085@Category({ VerySlowRegionServerTests.class, LargeTests.class }) // Starts 100 threads 086public class TestAtomicOperation { 087 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestAtomicOperation.class); 091 092 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); 093 @Rule 094 public TestName name = new TestName(); 095 096 HRegion region = null; 097 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 098 099 // Test names 100 static byte[] tableName; 101 static final byte[] qual1 = Bytes.toBytes("qual1"); 102 static final byte[] qual2 = Bytes.toBytes("qual2"); 103 static final byte[] qual3 = Bytes.toBytes("qual3"); 104 static final byte[] value1 = Bytes.toBytes("value1"); 105 static final byte[] value2 = Bytes.toBytes("value2"); 106 static final byte[] row = Bytes.toBytes("rowA"); 107 static final byte[] row2 = Bytes.toBytes("rowB"); 108 109 @Before 110 public void setup() { 111 tableName = Bytes.toBytes(name.getMethodName()); 112 } 113 114 @After 115 public void teardown() throws IOException { 116 if (region != null) { 117 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig(); 118 region.close(); 119 WAL wal = region.getWAL(); 120 if (wal != null) { 121 wal.close(); 122 } 123 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown); 124 region = null; 125 } 126 } 127 128 ////////////////////////////////////////////////////////////////////////////// 129 // New tests that doesn't spin up a mini cluster but rather just test the 130 // individual code pieces in the HRegion. 131 ////////////////////////////////////////////////////////////////////////////// 132 133 /** 134 * Test basic append operation. More tests in 135 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() 136 */ 137 @Test 138 public void testAppend() throws IOException { 139 initHRegion(tableName, name.getMethodName(), fam1); 140 String v1 = 141 "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything"; 142 String v2 = " is... 42."; 143 Append a = new Append(row); 144 a.setReturnResults(false); 145 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 146 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); 147 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); 148 a = new Append(row); 149 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 150 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 151 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 152 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1))); 153 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2))); 154 } 155 156 @Test 157 public void testAppendWithMultipleFamilies() throws IOException { 158 final byte[] fam3 = Bytes.toBytes("colfamily31"); 159 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3); 160 String v1 = "Appended"; 161 String v2 = "Value"; 162 163 Append a = new Append(row); 164 a.setReturnResults(false); 165 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 166 a.addColumn(fam2, qual2, Bytes.toBytes(v2)); 167 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 168 assertTrue("Expected an empty result but result contains " + result.size() + " keys", 169 result.isEmpty()); 170 171 a = new Append(row); 172 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 173 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 174 a.addColumn(fam3, qual3, Bytes.toBytes(v2)); 175 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 176 177 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 178 179 byte[] actualValue1 = result.getValue(fam1, qual1); 180 byte[] actualValue2 = result.getValue(fam2, qual2); 181 byte[] actualValue3 = result.getValue(fam3, qual3); 182 byte[] actualValue4 = result.getValue(fam1, qual2); 183 184 assertNotNull("Value1 should bot be null", actualValue1); 185 assertNotNull("Value2 should bot be null", actualValue2); 186 assertNotNull("Value3 should bot be null", actualValue3); 187 assertNotNull("Value4 should bot be null", actualValue4); 188 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1)); 189 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2)); 190 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3)); 191 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4)); 192 } 193 194 @Test 195 public void testAppendWithNonExistingFamily() throws IOException { 196 initHRegion(tableName, name.getMethodName(), fam1); 197 final String v1 = "Value"; 198 final Append a = new Append(row); 199 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 200 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 201 Result result = null; 202 try { 203 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 204 fail("Append operation should fail with NoSuchColumnFamilyException."); 205 } catch (NoSuchColumnFamilyException e) { 206 assertEquals(null, result); 207 } catch (Exception e) { 208 fail("Append operation should fail with NoSuchColumnFamilyException."); 209 } 210 } 211 212 @Test 213 public void testIncrementWithNonExistingFamily() throws IOException { 214 initHRegion(tableName, name.getMethodName(), fam1); 215 final Increment inc = new Increment(row); 216 inc.addColumn(fam1, qual1, 1); 217 inc.addColumn(fam2, qual2, 1); 218 inc.setDurability(Durability.ASYNC_WAL); 219 try { 220 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 221 } catch (NoSuchColumnFamilyException e) { 222 final Get g = new Get(row); 223 final Result result = region.get(g); 224 assertEquals(null, result.getValue(fam1, qual1)); 225 assertEquals(null, result.getValue(fam2, qual2)); 226 } catch (Exception e) { 227 fail("Increment operation should fail with NoSuchColumnFamilyException."); 228 } 229 } 230 231 /** 232 * Test multi-threaded increments. 233 */ 234 @Test 235 public void testIncrementMultiThreads() throws IOException { 236 boolean fast = true; 237 LOG.info("Starting test testIncrementMultiThreads"); 238 // run a with mixed column families (1 and 3 versions) 239 initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2); 240 241 // Create 100 threads, each will increment by its own quantity. All 100 threads update the 242 // same row over two column families. 243 int numThreads = 100; 244 int incrementsPerThread = 1000; 245 Incrementer[] all = new Incrementer[numThreads]; 246 int expectedTotal = 0; 247 // create all threads 248 for (int i = 0; i < numThreads; i++) { 249 all[i] = new Incrementer(region, i, i, incrementsPerThread); 250 expectedTotal += (i * incrementsPerThread); 251 } 252 253 // run all threads 254 for (int i = 0; i < numThreads; i++) { 255 all[i].start(); 256 } 257 258 // wait for all threads to finish 259 for (int i = 0; i < numThreads; i++) { 260 try { 261 all[i].join(); 262 } catch (InterruptedException e) { 263 LOG.info("Ignored", e); 264 } 265 } 266 assertICV(row, fam1, qual1, expectedTotal, fast); 267 assertICV(row, fam1, qual2, expectedTotal * 2, fast); 268 assertICV(row, fam2, qual3, expectedTotal * 3, fast); 269 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); 270 } 271 272 private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast) 273 throws IOException { 274 // run a get and see? 275 Get get = new Get(row); 276 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 277 get.addColumn(familiy, qualifier); 278 Result result = region.get(get); 279 assertEquals(1, result.size()); 280 281 Cell kv = result.rawCells()[0]; 282 long r = Bytes.toLong(CellUtil.cloneValue(kv)); 283 assertEquals(amount, r); 284 } 285 286 private void initHRegion(byte[] tableName, String callingMethod, byte[]... families) 287 throws IOException { 288 initHRegion(tableName, callingMethod, null, families); 289 } 290 291 private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions, 292 byte[]... families) throws IOException { 293 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 294 int i = 0; 295 for (byte[] family : families) { 296 HColumnDescriptor hcd = new HColumnDescriptor(family); 297 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); 298 htd.addFamily(hcd); 299 } 300 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 301 region = TEST_UTIL.createLocalHRegion(info, htd); 302 } 303 304 /** 305 * A thread that makes increment calls always on the same row, this.row against two column 306 * families on this row. 307 */ 308 public static class Incrementer extends Thread { 309 310 private final Region region; 311 private final int numIncrements; 312 private final int amount; 313 314 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { 315 super("Incrementer." + threadNumber); 316 this.region = region; 317 this.numIncrements = numIncrements; 318 this.amount = amount; 319 setDaemon(true); 320 } 321 322 @Override 323 public void run() { 324 for (int i = 0; i < numIncrements; i++) { 325 try { 326 Increment inc = new Increment(row); 327 inc.addColumn(fam1, qual1, amount); 328 inc.addColumn(fam1, qual2, amount * 2); 329 inc.addColumn(fam2, qual3, amount * 3); 330 inc.setDurability(Durability.ASYNC_WAL); 331 Result result = region.increment(inc); 332 if (result != null) { 333 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 334 Bytes.toLong(result.getValue(fam1, qual2))); 335 assertTrue(result.getValue(fam2, qual3) != null); 336 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3, 337 Bytes.toLong(result.getValue(fam2, qual3))); 338 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 339 Bytes.toLong(result.getValue(fam1, qual2))); 340 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3; 341 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); 342 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, fam1Increment, 343 fam2Increment); 344 } 345 } catch (IOException e) { 346 e.printStackTrace(); 347 } 348 } 349 } 350 } 351 352 @Test 353 public void testAppendMultiThreads() throws IOException { 354 LOG.info("Starting test testAppendMultiThreads"); 355 // run a with mixed column families (1 and 3 versions) 356 initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2); 357 358 int numThreads = 100; 359 int opsPerThread = 100; 360 AtomicOperation[] all = new AtomicOperation[numThreads]; 361 final byte[] val = new byte[] { 1 }; 362 363 AtomicInteger failures = new AtomicInteger(0); 364 // create all threads 365 for (int i = 0; i < numThreads; i++) { 366 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { 367 @Override 368 public void run() { 369 for (int i = 0; i < numOps; i++) { 370 try { 371 Append a = new Append(row); 372 a.addColumn(fam1, qual1, val); 373 a.addColumn(fam1, qual2, val); 374 a.addColumn(fam2, qual3, val); 375 a.setDurability(Durability.ASYNC_WAL); 376 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 377 378 Get g = new Get(row); 379 Result result = region.get(g); 380 assertEquals(result.getValue(fam1, qual1).length, 381 result.getValue(fam1, qual2).length); 382 assertEquals(result.getValue(fam1, qual1).length, 383 result.getValue(fam2, qual3).length); 384 } catch (IOException e) { 385 e.printStackTrace(); 386 failures.incrementAndGet(); 387 fail(); 388 } 389 } 390 } 391 }; 392 } 393 394 // run all threads 395 for (int i = 0; i < numThreads; i++) { 396 all[i].start(); 397 } 398 399 // wait for all threads to finish 400 for (int i = 0; i < numThreads; i++) { 401 try { 402 all[i].join(); 403 } catch (InterruptedException e) { 404 } 405 } 406 assertEquals(0, failures.get()); 407 Get g = new Get(row); 408 Result result = region.get(g); 409 assertEquals(10000, result.getValue(fam1, qual1).length); 410 assertEquals(10000, result.getValue(fam1, qual2).length); 411 assertEquals(10000, result.getValue(fam2, qual3).length); 412 } 413 414 /** 415 * Test multi-threaded row mutations. 416 */ 417 @Test 418 public void testRowMutationMultiThreads() throws IOException { 419 LOG.info("Starting test testRowMutationMultiThreads"); 420 initHRegion(tableName, name.getMethodName(), fam1); 421 422 // create 10 threads, each will alternate between adding and 423 // removing a column 424 int numThreads = 10; 425 int opsPerThread = 250; 426 AtomicOperation[] all = new AtomicOperation[numThreads]; 427 428 AtomicLong timeStamps = new AtomicLong(0); 429 AtomicInteger failures = new AtomicInteger(0); 430 // create all threads 431 for (int i = 0; i < numThreads; i++) { 432 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 433 @Override 434 public void run() { 435 boolean op = true; 436 for (int i = 0; i < numOps; i++) { 437 try { 438 // throw in some flushes 439 if (i % 10 == 0) { 440 synchronized (region) { 441 LOG.debug("flushing"); 442 region.flush(true); 443 if (i % 100 == 0) { 444 region.compact(false); 445 } 446 } 447 } 448 long ts = timeStamps.incrementAndGet(); 449 RowMutations rm = new RowMutations(row); 450 if (op) { 451 Put p = new Put(row, ts); 452 p.addColumn(fam1, qual1, value1); 453 p.setDurability(Durability.ASYNC_WAL); 454 rm.add(p); 455 Delete d = new Delete(row); 456 d.addColumns(fam1, qual2, ts); 457 d.setDurability(Durability.ASYNC_WAL); 458 rm.add(d); 459 } else { 460 Delete d = new Delete(row); 461 d.addColumns(fam1, qual1, ts); 462 d.setDurability(Durability.ASYNC_WAL); 463 rm.add(d); 464 Put p = new Put(row, ts); 465 p.addColumn(fam1, qual2, value2); 466 p.setDurability(Durability.ASYNC_WAL); 467 rm.add(p); 468 } 469 region.mutateRow(rm); 470 op ^= true; 471 // check: should always see exactly one column 472 Get g = new Get(row); 473 Result r = region.get(g); 474 if (r.size() != 1) { 475 LOG.debug(Objects.toString(r)); 476 failures.incrementAndGet(); 477 fail(); 478 } 479 } catch (IOException e) { 480 e.printStackTrace(); 481 failures.incrementAndGet(); 482 fail(); 483 } 484 } 485 } 486 }; 487 } 488 489 // run all threads 490 for (int i = 0; i < numThreads; i++) { 491 all[i].start(); 492 } 493 494 // wait for all threads to finish 495 for (int i = 0; i < numThreads; i++) { 496 try { 497 all[i].join(); 498 } catch (InterruptedException e) { 499 } 500 } 501 assertEquals(0, failures.get()); 502 } 503 504 /** 505 * Test multi-threaded region mutations. 506 */ 507 @Test 508 public void testMultiRowMutationMultiThreads() throws IOException { 509 510 LOG.info("Starting test testMultiRowMutationMultiThreads"); 511 initHRegion(tableName, name.getMethodName(), fam1); 512 513 // create 10 threads, each will alternate between adding and 514 // removing a column 515 int numThreads = 10; 516 int opsPerThread = 250; 517 AtomicOperation[] all = new AtomicOperation[numThreads]; 518 519 AtomicLong timeStamps = new AtomicLong(0); 520 AtomicInteger failures = new AtomicInteger(0); 521 final List<byte[]> rowsToLock = Arrays.asList(row, row2); 522 // create all threads 523 for (int i = 0; i < numThreads; i++) { 524 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 525 @Override 526 public void run() { 527 boolean op = true; 528 for (int i = 0; i < numOps; i++) { 529 try { 530 // throw in some flushes 531 if (i % 10 == 0) { 532 synchronized (region) { 533 LOG.debug("flushing"); 534 region.flush(true); 535 if (i % 100 == 0) { 536 region.compact(false); 537 } 538 } 539 } 540 long ts = timeStamps.incrementAndGet(); 541 List<Mutation> mrm = new ArrayList<>(); 542 if (op) { 543 Put p = new Put(row2, ts); 544 p.addColumn(fam1, qual1, value1); 545 p.setDurability(Durability.ASYNC_WAL); 546 mrm.add(p); 547 Delete d = new Delete(row); 548 d.addColumns(fam1, qual1, ts); 549 d.setDurability(Durability.ASYNC_WAL); 550 mrm.add(d); 551 } else { 552 Delete d = new Delete(row2); 553 d.addColumns(fam1, qual1, ts); 554 d.setDurability(Durability.ASYNC_WAL); 555 mrm.add(d); 556 Put p = new Put(row, ts); 557 p.setDurability(Durability.ASYNC_WAL); 558 p.addColumn(fam1, qual1, value2); 559 mrm.add(p); 560 } 561 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); 562 op ^= true; 563 // check: should always see exactly one column 564 Scan s = new Scan(row); 565 RegionScanner rs = region.getScanner(s); 566 List<Cell> r = new ArrayList<>(); 567 while (rs.next(r)) 568 ; 569 rs.close(); 570 if (r.size() != 1) { 571 LOG.debug(Objects.toString(r)); 572 failures.incrementAndGet(); 573 fail(); 574 } 575 } catch (IOException e) { 576 e.printStackTrace(); 577 failures.incrementAndGet(); 578 fail(); 579 } 580 } 581 } 582 }; 583 } 584 585 // run all threads 586 for (int i = 0; i < numThreads; i++) { 587 all[i].start(); 588 } 589 590 // wait for all threads to finish 591 for (int i = 0; i < numThreads; i++) { 592 try { 593 all[i].join(); 594 } catch (InterruptedException e) { 595 } 596 } 597 assertEquals(0, failures.get()); 598 } 599 600 public static class AtomicOperation extends Thread { 601 protected final HRegion region; 602 protected final int numOps; 603 protected final AtomicLong timeStamps; 604 protected final AtomicInteger failures; 605 606 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 607 AtomicInteger failures) { 608 this.region = region; 609 this.numOps = numOps; 610 this.timeStamps = timeStamps; 611 this.failures = failures; 612 } 613 } 614 615 private static CountDownLatch latch = new CountDownLatch(1); 616 617 private enum TestStep { 618 INIT, // initial put of 10 to set value of the cell 619 PUT_STARTED, // began doing a put of 50 to cell 620 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). 621 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 622 CHECKANDPUT_COMPLETED // completed checkAndPut 623 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! 624 } 625 626 private static volatile TestStep testStep = TestStep.INIT; 627 private final String family = "f1"; 628 629 /** 630 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into 631 * TestAtomicOperation from its original location, TestHBase7051 632 */ 633 @Test 634 public void testPutAndCheckAndPutInParallel() throws Exception { 635 Configuration conf = TEST_UTIL.getConfiguration(); 636 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); 637 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())) 638 .addFamily(new HColumnDescriptor(family)); 639 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 640 Put[] puts = new Put[1]; 641 Put put = new Put(Bytes.toBytes("r1")); 642 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); 643 puts[0] = put; 644 645 region.batchMutate(puts); 646 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 647 ctx.addThread(new PutThread(ctx, region)); 648 ctx.addThread(new CheckAndPutThread(ctx, region)); 649 ctx.startThreads(); 650 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { 651 Thread.sleep(100); 652 } 653 ctx.stop(); 654 Scan s = new Scan(); 655 RegionScanner scanner = region.getScanner(s); 656 List<Cell> results = new ArrayList<>(); 657 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); 658 scanner.next(results, scannerContext); 659 for (Cell keyValue : results) { 660 assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue))); 661 } 662 } 663 664 private class PutThread extends TestThread { 665 private Region region; 666 667 PutThread(TestContext ctx, Region region) { 668 super(ctx); 669 this.region = region; 670 } 671 672 @Override 673 public void doWork() throws Exception { 674 Put[] puts = new Put[1]; 675 Put put = new Put(Bytes.toBytes("r1")); 676 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); 677 puts[0] = put; 678 testStep = TestStep.PUT_STARTED; 679 region.batchMutate(puts); 680 } 681 } 682 683 private class CheckAndPutThread extends TestThread { 684 private Region region; 685 686 CheckAndPutThread(TestContext ctx, Region region) { 687 super(ctx); 688 this.region = region; 689 } 690 691 @Override 692 public void doWork() throws Exception { 693 Put[] puts = new Put[1]; 694 Put put = new Put(Bytes.toBytes("r1")); 695 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); 696 puts[0] = put; 697 while (testStep != TestStep.PUT_COMPLETED) { 698 Thread.sleep(100); 699 } 700 testStep = TestStep.CHECKANDPUT_STARTED; 701 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), 702 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); 703 testStep = TestStep.CHECKANDPUT_COMPLETED; 704 } 705 } 706 707 public static class MockHRegion extends HRegion { 708 709 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, 710 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { 711 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); 712 } 713 714 @Override 715 protected RowLock getRowLockInternal(final byte[] row, boolean readLock, 716 final RowLock prevRowlock) throws IOException { 717 if (testStep == TestStep.CHECKANDPUT_STARTED) { 718 latch.countDown(); 719 } 720 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); 721 } 722 723 public class WrappedRowLock implements RowLock { 724 725 private final RowLock rowLock; 726 727 private WrappedRowLock(RowLock rowLock) { 728 this.rowLock = rowLock; 729 } 730 731 @Override 732 public void release() { 733 if (testStep == TestStep.INIT) { 734 this.rowLock.release(); 735 return; 736 } 737 738 if (testStep == TestStep.PUT_STARTED) { 739 try { 740 testStep = TestStep.PUT_COMPLETED; 741 this.rowLock.release(); 742 // put has been written to the memstore and the row lock has been released, but the 743 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of 744 // operations would cause the non-atomicity to show up: 745 // 1) Put releases row lock (where we are now) 746 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) 747 // because the MVCC has not advanced 748 // 3) Put advances MVCC 749 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock 750 // (see below), and then wait some more to give the checkAndPut time to read the old 751 // value. 752 latch.await(); 753 Thread.sleep(1000); 754 } catch (InterruptedException e) { 755 Thread.currentThread().interrupt(); 756 } 757 } else if (testStep == TestStep.CHECKANDPUT_STARTED) { 758 this.rowLock.release(); 759 } 760 } 761 } 762 } 763}