001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.Objects;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.MultithreadedTestUtil;
048import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.Append;
052import org.apache.hadoop.hbase.client.Delete;
053import org.apache.hadoop.hbase.client.Durability;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Increment;
056import org.apache.hadoop.hbase.client.IsolationLevel;
057import org.apache.hadoop.hbase.client.Mutation;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.Result;
061import org.apache.hadoop.hbase.client.RowMutations;
062import org.apache.hadoop.hbase.client.Scan;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.filter.BinaryComparator;
065import org.apache.hadoop.hbase.io.HeapSize;
066import org.apache.hadoop.hbase.io.hfile.BlockCache;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.junit.After;
073import org.junit.Before;
074import org.junit.ClassRule;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082/**
083 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append
084 */
085@Category({ VerySlowRegionServerTests.class, LargeTests.class }) // Starts 100 threads
086public class TestAtomicOperation {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090    HBaseClassTestRule.forClass(TestAtomicOperation.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
093  @Rule
094  public TestName name = new TestName();
095
096  HRegion region = null;
097  private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
098
099  // Test names
100  static byte[] tableName;
101  static final byte[] qual1 = Bytes.toBytes("qual1");
102  static final byte[] qual2 = Bytes.toBytes("qual2");
103  static final byte[] qual3 = Bytes.toBytes("qual3");
104  static final byte[] value1 = Bytes.toBytes("value1");
105  static final byte[] value2 = Bytes.toBytes("value2");
106  static final byte[] row = Bytes.toBytes("rowA");
107  static final byte[] row2 = Bytes.toBytes("rowB");
108
109  @Before
110  public void setup() {
111    tableName = Bytes.toBytes(name.getMethodName());
112  }
113
114  @After
115  public void teardown() throws IOException {
116    if (region != null) {
117      CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
118      region.close();
119      WAL wal = region.getWAL();
120      if (wal != null) {
121        wal.close();
122      }
123      cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
124      region = null;
125    }
126  }
127
128  //////////////////////////////////////////////////////////////////////////////
129  // New tests that doesn't spin up a mini cluster but rather just test the
130  // individual code pieces in the HRegion.
131  //////////////////////////////////////////////////////////////////////////////
132
133  /**
134   * Test basic append operation. More tests in
135   * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
136   */
137  @Test
138  public void testAppend() throws IOException {
139    initHRegion(tableName, name.getMethodName(), fam1);
140    String v1 =
141      "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything";
142    String v2 = " is... 42.";
143    Append a = new Append(row);
144    a.setReturnResults(false);
145    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
146    a.addColumn(fam1, qual2, Bytes.toBytes(v2));
147    assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
148    a = new Append(row);
149    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
150    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
151    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
152    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1)));
153    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2)));
154  }
155
156  @Test
157  public void testAppendWithMultipleFamilies() throws IOException {
158    final byte[] fam3 = Bytes.toBytes("colfamily31");
159    initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3);
160    String v1 = "Appended";
161    String v2 = "Value";
162
163    Append a = new Append(row);
164    a.setReturnResults(false);
165    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
166    a.addColumn(fam2, qual2, Bytes.toBytes(v2));
167    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
168    assertTrue("Expected an empty result but result contains " + result.size() + " keys",
169      result.isEmpty());
170
171    a = new Append(row);
172    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
173    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
174    a.addColumn(fam3, qual3, Bytes.toBytes(v2));
175    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
176
177    result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
178
179    byte[] actualValue1 = result.getValue(fam1, qual1);
180    byte[] actualValue2 = result.getValue(fam2, qual2);
181    byte[] actualValue3 = result.getValue(fam3, qual3);
182    byte[] actualValue4 = result.getValue(fam1, qual2);
183
184    assertNotNull("Value1 should bot be null", actualValue1);
185    assertNotNull("Value2 should bot be null", actualValue2);
186    assertNotNull("Value3 should bot be null", actualValue3);
187    assertNotNull("Value4 should bot be null", actualValue4);
188    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
189    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
190    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
191    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
192  }
193
194  @Test
195  public void testAppendWithNonExistingFamily() throws IOException {
196    initHRegion(tableName, name.getMethodName(), fam1);
197    final String v1 = "Value";
198    final Append a = new Append(row);
199    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
200    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
201    Result result = null;
202    try {
203      result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
204      fail("Append operation should fail with NoSuchColumnFamilyException.");
205    } catch (NoSuchColumnFamilyException e) {
206      assertEquals(null, result);
207    } catch (Exception e) {
208      fail("Append operation should fail with NoSuchColumnFamilyException.");
209    }
210  }
211
212  @Test
213  public void testIncrementWithNonExistingFamily() throws IOException {
214    initHRegion(tableName, name.getMethodName(), fam1);
215    final Increment inc = new Increment(row);
216    inc.addColumn(fam1, qual1, 1);
217    inc.addColumn(fam2, qual2, 1);
218    inc.setDurability(Durability.ASYNC_WAL);
219    try {
220      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
221    } catch (NoSuchColumnFamilyException e) {
222      final Get g = new Get(row);
223      final Result result = region.get(g);
224      assertEquals(null, result.getValue(fam1, qual1));
225      assertEquals(null, result.getValue(fam2, qual2));
226    } catch (Exception e) {
227      fail("Increment operation should fail with NoSuchColumnFamilyException.");
228    }
229  }
230
231  /**
232   * Test multi-threaded increments.
233   */
234  @Test
235  public void testIncrementMultiThreads() throws IOException {
236    boolean fast = true;
237    LOG.info("Starting test testIncrementMultiThreads");
238    // run a with mixed column families (1 and 3 versions)
239    initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2);
240
241    // Create 100 threads, each will increment by its own quantity. All 100 threads update the
242    // same row over two column families.
243    int numThreads = 100;
244    int incrementsPerThread = 1000;
245    Incrementer[] all = new Incrementer[numThreads];
246    int expectedTotal = 0;
247    // create all threads
248    for (int i = 0; i < numThreads; i++) {
249      all[i] = new Incrementer(region, i, i, incrementsPerThread);
250      expectedTotal += (i * incrementsPerThread);
251    }
252
253    // run all threads
254    for (int i = 0; i < numThreads; i++) {
255      all[i].start();
256    }
257
258    // wait for all threads to finish
259    for (int i = 0; i < numThreads; i++) {
260      try {
261        all[i].join();
262      } catch (InterruptedException e) {
263        LOG.info("Ignored", e);
264      }
265    }
266    assertICV(row, fam1, qual1, expectedTotal, fast);
267    assertICV(row, fam1, qual2, expectedTotal * 2, fast);
268    assertICV(row, fam2, qual3, expectedTotal * 3, fast);
269    LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
270  }
271
272  private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast)
273    throws IOException {
274    // run a get and see?
275    Get get = new Get(row);
276    if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
277    get.addColumn(familiy, qualifier);
278    Result result = region.get(get);
279    assertEquals(1, result.size());
280
281    Cell kv = result.rawCells()[0];
282    long r = Bytes.toLong(CellUtil.cloneValue(kv));
283    assertEquals(amount, r);
284  }
285
286  private void initHRegion(byte[] tableName, String callingMethod, byte[]... families)
287    throws IOException {
288    initHRegion(tableName, callingMethod, null, families);
289  }
290
291  private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions,
292    byte[]... families) throws IOException {
293    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
294    int i = 0;
295    for (byte[] family : families) {
296      HColumnDescriptor hcd = new HColumnDescriptor(family);
297      hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
298      htd.addFamily(hcd);
299    }
300    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
301    region = TEST_UTIL.createLocalHRegion(info, htd);
302  }
303
304  /**
305   * A thread that makes increment calls always on the same row, this.row against two column
306   * families on this row.
307   */
308  public static class Incrementer extends Thread {
309
310    private final Region region;
311    private final int numIncrements;
312    private final int amount;
313
314    public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
315      super("Incrementer." + threadNumber);
316      this.region = region;
317      this.numIncrements = numIncrements;
318      this.amount = amount;
319      setDaemon(true);
320    }
321
322    @Override
323    public void run() {
324      for (int i = 0; i < numIncrements; i++) {
325        try {
326          Increment inc = new Increment(row);
327          inc.addColumn(fam1, qual1, amount);
328          inc.addColumn(fam1, qual2, amount * 2);
329          inc.addColumn(fam2, qual3, amount * 3);
330          inc.setDurability(Durability.ASYNC_WAL);
331          Result result = region.increment(inc);
332          if (result != null) {
333            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
334              Bytes.toLong(result.getValue(fam1, qual2)));
335            assertTrue(result.getValue(fam2, qual3) != null);
336            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3,
337              Bytes.toLong(result.getValue(fam2, qual3)));
338            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
339              Bytes.toLong(result.getValue(fam1, qual2)));
340            long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3;
341            long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
342            assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, fam1Increment,
343              fam2Increment);
344          }
345        } catch (IOException e) {
346          e.printStackTrace();
347        }
348      }
349    }
350  }
351
352  @Test
353  public void testAppendMultiThreads() throws IOException {
354    LOG.info("Starting test testAppendMultiThreads");
355    // run a with mixed column families (1 and 3 versions)
356    initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2);
357
358    int numThreads = 100;
359    int opsPerThread = 100;
360    AtomicOperation[] all = new AtomicOperation[numThreads];
361    final byte[] val = new byte[] { 1 };
362
363    AtomicInteger failures = new AtomicInteger(0);
364    // create all threads
365    for (int i = 0; i < numThreads; i++) {
366      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
367        @Override
368        public void run() {
369          for (int i = 0; i < numOps; i++) {
370            try {
371              Append a = new Append(row);
372              a.addColumn(fam1, qual1, val);
373              a.addColumn(fam1, qual2, val);
374              a.addColumn(fam2, qual3, val);
375              a.setDurability(Durability.ASYNC_WAL);
376              region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
377
378              Get g = new Get(row);
379              Result result = region.get(g);
380              assertEquals(result.getValue(fam1, qual1).length,
381                result.getValue(fam1, qual2).length);
382              assertEquals(result.getValue(fam1, qual1).length,
383                result.getValue(fam2, qual3).length);
384            } catch (IOException e) {
385              e.printStackTrace();
386              failures.incrementAndGet();
387              fail();
388            }
389          }
390        }
391      };
392    }
393
394    // run all threads
395    for (int i = 0; i < numThreads; i++) {
396      all[i].start();
397    }
398
399    // wait for all threads to finish
400    for (int i = 0; i < numThreads; i++) {
401      try {
402        all[i].join();
403      } catch (InterruptedException e) {
404      }
405    }
406    assertEquals(0, failures.get());
407    Get g = new Get(row);
408    Result result = region.get(g);
409    assertEquals(10000, result.getValue(fam1, qual1).length);
410    assertEquals(10000, result.getValue(fam1, qual2).length);
411    assertEquals(10000, result.getValue(fam2, qual3).length);
412  }
413
414  /**
415   * Test multi-threaded row mutations.
416   */
417  @Test
418  public void testRowMutationMultiThreads() throws IOException {
419    LOG.info("Starting test testRowMutationMultiThreads");
420    initHRegion(tableName, name.getMethodName(), fam1);
421
422    // create 10 threads, each will alternate between adding and
423    // removing a column
424    int numThreads = 10;
425    int opsPerThread = 250;
426    AtomicOperation[] all = new AtomicOperation[numThreads];
427
428    AtomicLong timeStamps = new AtomicLong(0);
429    AtomicInteger failures = new AtomicInteger(0);
430    // create all threads
431    for (int i = 0; i < numThreads; i++) {
432      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
433        @Override
434        public void run() {
435          boolean op = true;
436          for (int i = 0; i < numOps; i++) {
437            try {
438              // throw in some flushes
439              if (i % 10 == 0) {
440                synchronized (region) {
441                  LOG.debug("flushing");
442                  region.flush(true);
443                  if (i % 100 == 0) {
444                    region.compact(false);
445                  }
446                }
447              }
448              long ts = timeStamps.incrementAndGet();
449              RowMutations rm = new RowMutations(row);
450              if (op) {
451                Put p = new Put(row, ts);
452                p.addColumn(fam1, qual1, value1);
453                p.setDurability(Durability.ASYNC_WAL);
454                rm.add(p);
455                Delete d = new Delete(row);
456                d.addColumns(fam1, qual2, ts);
457                d.setDurability(Durability.ASYNC_WAL);
458                rm.add(d);
459              } else {
460                Delete d = new Delete(row);
461                d.addColumns(fam1, qual1, ts);
462                d.setDurability(Durability.ASYNC_WAL);
463                rm.add(d);
464                Put p = new Put(row, ts);
465                p.addColumn(fam1, qual2, value2);
466                p.setDurability(Durability.ASYNC_WAL);
467                rm.add(p);
468              }
469              region.mutateRow(rm);
470              op ^= true;
471              // check: should always see exactly one column
472              Get g = new Get(row);
473              Result r = region.get(g);
474              if (r.size() != 1) {
475                LOG.debug(Objects.toString(r));
476                failures.incrementAndGet();
477                fail();
478              }
479            } catch (IOException e) {
480              e.printStackTrace();
481              failures.incrementAndGet();
482              fail();
483            }
484          }
485        }
486      };
487    }
488
489    // run all threads
490    for (int i = 0; i < numThreads; i++) {
491      all[i].start();
492    }
493
494    // wait for all threads to finish
495    for (int i = 0; i < numThreads; i++) {
496      try {
497        all[i].join();
498      } catch (InterruptedException e) {
499      }
500    }
501    assertEquals(0, failures.get());
502  }
503
504  /**
505   * Test multi-threaded region mutations.
506   */
507  @Test
508  public void testMultiRowMutationMultiThreads() throws IOException {
509
510    LOG.info("Starting test testMultiRowMutationMultiThreads");
511    initHRegion(tableName, name.getMethodName(), fam1);
512
513    // create 10 threads, each will alternate between adding and
514    // removing a column
515    int numThreads = 10;
516    int opsPerThread = 250;
517    AtomicOperation[] all = new AtomicOperation[numThreads];
518
519    AtomicLong timeStamps = new AtomicLong(0);
520    AtomicInteger failures = new AtomicInteger(0);
521    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
522    // create all threads
523    for (int i = 0; i < numThreads; i++) {
524      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
525        @Override
526        public void run() {
527          boolean op = true;
528          for (int i = 0; i < numOps; i++) {
529            try {
530              // throw in some flushes
531              if (i % 10 == 0) {
532                synchronized (region) {
533                  LOG.debug("flushing");
534                  region.flush(true);
535                  if (i % 100 == 0) {
536                    region.compact(false);
537                  }
538                }
539              }
540              long ts = timeStamps.incrementAndGet();
541              List<Mutation> mrm = new ArrayList<>();
542              if (op) {
543                Put p = new Put(row2, ts);
544                p.addColumn(fam1, qual1, value1);
545                p.setDurability(Durability.ASYNC_WAL);
546                mrm.add(p);
547                Delete d = new Delete(row);
548                d.addColumns(fam1, qual1, ts);
549                d.setDurability(Durability.ASYNC_WAL);
550                mrm.add(d);
551              } else {
552                Delete d = new Delete(row2);
553                d.addColumns(fam1, qual1, ts);
554                d.setDurability(Durability.ASYNC_WAL);
555                mrm.add(d);
556                Put p = new Put(row, ts);
557                p.setDurability(Durability.ASYNC_WAL);
558                p.addColumn(fam1, qual1, value2);
559                mrm.add(p);
560              }
561              region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
562              op ^= true;
563              // check: should always see exactly one column
564              Scan s = new Scan(row);
565              RegionScanner rs = region.getScanner(s);
566              List<Cell> r = new ArrayList<>();
567              while (rs.next(r))
568                ;
569              rs.close();
570              if (r.size() != 1) {
571                LOG.debug(Objects.toString(r));
572                failures.incrementAndGet();
573                fail();
574              }
575            } catch (IOException e) {
576              e.printStackTrace();
577              failures.incrementAndGet();
578              fail();
579            }
580          }
581        }
582      };
583    }
584
585    // run all threads
586    for (int i = 0; i < numThreads; i++) {
587      all[i].start();
588    }
589
590    // wait for all threads to finish
591    for (int i = 0; i < numThreads; i++) {
592      try {
593        all[i].join();
594      } catch (InterruptedException e) {
595      }
596    }
597    assertEquals(0, failures.get());
598  }
599
600  public static class AtomicOperation extends Thread {
601    protected final HRegion region;
602    protected final int numOps;
603    protected final AtomicLong timeStamps;
604    protected final AtomicInteger failures;
605
606    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
607      AtomicInteger failures) {
608      this.region = region;
609      this.numOps = numOps;
610      this.timeStamps = timeStamps;
611      this.failures = failures;
612    }
613  }
614
615  private static CountDownLatch latch = new CountDownLatch(1);
616
617  private enum TestStep {
618    INIT, // initial put of 10 to set value of the cell
619    PUT_STARTED, // began doing a put of 50 to cell
620    PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
621    CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
622    CHECKANDPUT_COMPLETED // completed checkAndPut
623    // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
624  }
625
626  private static volatile TestStep testStep = TestStep.INIT;
627  private final String family = "f1";
628
629  /**
630   * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into
631   * TestAtomicOperation from its original location, TestHBase7051
632   */
633  @Test
634  public void testPutAndCheckAndPutInParallel() throws Exception {
635    Configuration conf = TEST_UTIL.getConfiguration();
636    conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
637    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
638      .addFamily(new HColumnDescriptor(family));
639    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
640    Put[] puts = new Put[1];
641    Put put = new Put(Bytes.toBytes("r1"));
642    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
643    puts[0] = put;
644
645    region.batchMutate(puts);
646    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
647    ctx.addThread(new PutThread(ctx, region));
648    ctx.addThread(new CheckAndPutThread(ctx, region));
649    ctx.startThreads();
650    while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
651      Thread.sleep(100);
652    }
653    ctx.stop();
654    Scan s = new Scan();
655    RegionScanner scanner = region.getScanner(s);
656    List<Cell> results = new ArrayList<>();
657    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
658    scanner.next(results, scannerContext);
659    for (Cell keyValue : results) {
660      assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue)));
661    }
662  }
663
664  private class PutThread extends TestThread {
665    private Region region;
666
667    PutThread(TestContext ctx, Region region) {
668      super(ctx);
669      this.region = region;
670    }
671
672    @Override
673    public void doWork() throws Exception {
674      Put[] puts = new Put[1];
675      Put put = new Put(Bytes.toBytes("r1"));
676      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
677      puts[0] = put;
678      testStep = TestStep.PUT_STARTED;
679      region.batchMutate(puts);
680    }
681  }
682
683  private class CheckAndPutThread extends TestThread {
684    private Region region;
685
686    CheckAndPutThread(TestContext ctx, Region region) {
687      super(ctx);
688      this.region = region;
689    }
690
691    @Override
692    public void doWork() throws Exception {
693      Put[] puts = new Put[1];
694      Put put = new Put(Bytes.toBytes("r1"));
695      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
696      puts[0] = put;
697      while (testStep != TestStep.PUT_COMPLETED) {
698        Thread.sleep(100);
699      }
700      testStep = TestStep.CHECKANDPUT_STARTED;
701      region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
702        CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
703      testStep = TestStep.CHECKANDPUT_COMPLETED;
704    }
705  }
706
707  public static class MockHRegion extends HRegion {
708
709    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
710      final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
711      super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
712    }
713
714    @Override
715    protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
716      final RowLock prevRowlock) throws IOException {
717      if (testStep == TestStep.CHECKANDPUT_STARTED) {
718        latch.countDown();
719      }
720      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
721    }
722
723    public class WrappedRowLock implements RowLock {
724
725      private final RowLock rowLock;
726
727      private WrappedRowLock(RowLock rowLock) {
728        this.rowLock = rowLock;
729      }
730
731      @Override
732      public void release() {
733        if (testStep == TestStep.INIT) {
734          this.rowLock.release();
735          return;
736        }
737
738        if (testStep == TestStep.PUT_STARTED) {
739          try {
740            testStep = TestStep.PUT_COMPLETED;
741            this.rowLock.release();
742            // put has been written to the memstore and the row lock has been released, but the
743            // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
744            // operations would cause the non-atomicity to show up:
745            // 1) Put releases row lock (where we are now)
746            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
747            // because the MVCC has not advanced
748            // 3) Put advances MVCC
749            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
750            // (see below), and then wait some more to give the checkAndPut time to read the old
751            // value.
752            latch.await();
753            Thread.sleep(1000);
754          } catch (InterruptedException e) {
755            Thread.currentThread().interrupt();
756          }
757        } else if (testStep == TestStep.CHECKANDPUT_STARTED) {
758          this.rowLock.release();
759        }
760      }
761    }
762  }
763}