001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.Objects;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.MultithreadedTestUtil;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Append;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Get;
054import org.apache.hadoop.hbase.client.Increment;
055import org.apache.hadoop.hbase.client.IsolationLevel;
056import org.apache.hadoop.hbase.client.Mutation;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.RegionInfoBuilder;
060import org.apache.hadoop.hbase.client.Result;
061import org.apache.hadoop.hbase.client.RowMutations;
062import org.apache.hadoop.hbase.client.Scan;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
065import org.apache.hadoop.hbase.filter.BinaryComparator;
066import org.apache.hadoop.hbase.io.HeapSize;
067import org.apache.hadoop.hbase.io.hfile.BlockCache;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.junit.After;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083/**
084 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append
085 */
086@Category({ VerySlowRegionServerTests.class, LargeTests.class }) // Starts 100 threads
087public class TestAtomicOperation {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestAtomicOperation.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
094  @Rule
095  public TestName name = new TestName();
096
097  HRegion region = null;
098  private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
099
100  // Test names
101  static byte[] tableName;
102  static final byte[] qual1 = Bytes.toBytes("qual1");
103  static final byte[] qual2 = Bytes.toBytes("qual2");
104  static final byte[] qual3 = Bytes.toBytes("qual3");
105  static final byte[] value1 = Bytes.toBytes("value1");
106  static final byte[] value2 = Bytes.toBytes("value2");
107  static final byte[] row = Bytes.toBytes("rowA");
108  static final byte[] row2 = Bytes.toBytes("rowB");
109
110  @Before
111  public void setup() {
112    tableName = Bytes.toBytes(name.getMethodName());
113  }
114
115  @After
116  public void teardown() throws IOException {
117    if (region != null) {
118      CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
119      region.close();
120      WAL wal = region.getWAL();
121      if (wal != null) {
122        wal.close();
123      }
124      cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
125      region = null;
126    }
127  }
128
129  //////////////////////////////////////////////////////////////////////////////
130  // New tests that doesn't spin up a mini cluster but rather just test the
131  // individual code pieces in the HRegion.
132  //////////////////////////////////////////////////////////////////////////////
133
134  /**
135   * Test basic append operation. More tests in
136   * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
137   */
138  @Test
139  public void testAppend() throws IOException {
140    initHRegion(tableName, name.getMethodName(), fam1);
141    String v1 =
142      "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything";
143    String v2 = " is... 42.";
144    Append a = new Append(row);
145    a.setReturnResults(false);
146    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
147    a.addColumn(fam1, qual2, Bytes.toBytes(v2));
148    assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
149    a = new Append(row);
150    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
151    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
152    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
153    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1)));
154    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2)));
155  }
156
157  @Test
158  public void testAppendWithMultipleFamilies() throws IOException {
159    final byte[] fam3 = Bytes.toBytes("colfamily31");
160    initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3);
161    String v1 = "Appended";
162    String v2 = "Value";
163
164    Append a = new Append(row);
165    a.setReturnResults(false);
166    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
167    a.addColumn(fam2, qual2, Bytes.toBytes(v2));
168    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
169    assertTrue("Expected an empty result but result contains " + result.size() + " keys",
170      result.isEmpty());
171
172    a = new Append(row);
173    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
174    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
175    a.addColumn(fam3, qual3, Bytes.toBytes(v2));
176    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
177
178    result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
179
180    byte[] actualValue1 = result.getValue(fam1, qual1);
181    byte[] actualValue2 = result.getValue(fam2, qual2);
182    byte[] actualValue3 = result.getValue(fam3, qual3);
183    byte[] actualValue4 = result.getValue(fam1, qual2);
184
185    assertNotNull("Value1 should bot be null", actualValue1);
186    assertNotNull("Value2 should bot be null", actualValue2);
187    assertNotNull("Value3 should bot be null", actualValue3);
188    assertNotNull("Value4 should bot be null", actualValue4);
189    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
190    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
191    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
192    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
193  }
194
195  @Test
196  public void testAppendWithNonExistingFamily() throws IOException {
197    initHRegion(tableName, name.getMethodName(), fam1);
198    final String v1 = "Value";
199    final Append a = new Append(row);
200    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
201    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
202    Result result = null;
203    try {
204      result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
205      fail("Append operation should fail with NoSuchColumnFamilyException.");
206    } catch (NoSuchColumnFamilyException e) {
207      assertEquals(null, result);
208    } catch (Exception e) {
209      fail("Append operation should fail with NoSuchColumnFamilyException.");
210    }
211  }
212
213  @Test
214  public void testIncrementWithNonExistingFamily() throws IOException {
215    initHRegion(tableName, name.getMethodName(), fam1);
216    final Increment inc = new Increment(row);
217    inc.addColumn(fam1, qual1, 1);
218    inc.addColumn(fam2, qual2, 1);
219    inc.setDurability(Durability.ASYNC_WAL);
220    try {
221      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
222    } catch (NoSuchColumnFamilyException e) {
223      final Get g = new Get(row);
224      final Result result = region.get(g);
225      assertEquals(null, result.getValue(fam1, qual1));
226      assertEquals(null, result.getValue(fam2, qual2));
227    } catch (Exception e) {
228      fail("Increment operation should fail with NoSuchColumnFamilyException.");
229    }
230  }
231
232  /**
233   * Test multi-threaded increments.
234   */
235  @Test
236  public void testIncrementMultiThreads() throws IOException {
237    boolean fast = true;
238    LOG.info("Starting test testIncrementMultiThreads");
239    // run a with mixed column families (1 and 3 versions)
240    initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2);
241
242    // Create 100 threads, each will increment by its own quantity. All 100 threads update the
243    // same row over two column families.
244    int numThreads = 100;
245    int incrementsPerThread = 1000;
246    Incrementer[] all = new Incrementer[numThreads];
247    int expectedTotal = 0;
248    // create all threads
249    for (int i = 0; i < numThreads; i++) {
250      all[i] = new Incrementer(region, i, i, incrementsPerThread);
251      expectedTotal += (i * incrementsPerThread);
252    }
253
254    // run all threads
255    for (int i = 0; i < numThreads; i++) {
256      all[i].start();
257    }
258
259    // wait for all threads to finish
260    for (int i = 0; i < numThreads; i++) {
261      try {
262        all[i].join();
263      } catch (InterruptedException e) {
264        LOG.info("Ignored", e);
265      }
266    }
267    assertICV(row, fam1, qual1, expectedTotal, fast);
268    assertICV(row, fam1, qual2, expectedTotal * 2, fast);
269    assertICV(row, fam2, qual3, expectedTotal * 3, fast);
270    LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
271  }
272
273  private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast)
274    throws IOException {
275    // run a get and see?
276    Get get = new Get(row);
277    if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
278    get.addColumn(familiy, qualifier);
279    Result result = region.get(get);
280    assertEquals(1, result.size());
281
282    Cell kv = result.rawCells()[0];
283    long r = Bytes.toLong(CellUtil.cloneValue(kv));
284    assertEquals(amount, r);
285  }
286
287  private void initHRegion(byte[] tableName, String callingMethod, byte[]... families)
288    throws IOException {
289    initHRegion(tableName, callingMethod, null, families);
290  }
291
292  private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions,
293    byte[]... families) throws IOException {
294    TableDescriptorBuilder builder =
295      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
296
297    int i = 0;
298    for (byte[] family : families) {
299      ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family)
300        .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build();
301      builder.setColumnFamily(familyDescriptor);
302    }
303    TableDescriptor tableDescriptor = builder.build();
304    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
305    region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
306  }
307
308  /**
309   * A thread that makes increment calls always on the same row, this.row against two column
310   * families on this row.
311   */
312  public static class Incrementer extends Thread {
313
314    private final Region region;
315    private final int numIncrements;
316    private final int amount;
317
318    public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
319      super("Incrementer." + threadNumber);
320      this.region = region;
321      this.numIncrements = numIncrements;
322      this.amount = amount;
323      setDaemon(true);
324    }
325
326    @Override
327    public void run() {
328      for (int i = 0; i < numIncrements; i++) {
329        try {
330          Increment inc = new Increment(row);
331          inc.addColumn(fam1, qual1, amount);
332          inc.addColumn(fam1, qual2, amount * 2);
333          inc.addColumn(fam2, qual3, amount * 3);
334          inc.setDurability(Durability.ASYNC_WAL);
335          Result result = region.increment(inc);
336          if (result != null) {
337            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
338              Bytes.toLong(result.getValue(fam1, qual2)));
339            assertTrue(result.getValue(fam2, qual3) != null);
340            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3,
341              Bytes.toLong(result.getValue(fam2, qual3)));
342            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
343              Bytes.toLong(result.getValue(fam1, qual2)));
344            long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3;
345            long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
346            assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, fam1Increment,
347              fam2Increment);
348          }
349        } catch (IOException e) {
350          e.printStackTrace();
351        }
352      }
353    }
354  }
355
356  @Test
357  public void testAppendMultiThreads() throws IOException {
358    LOG.info("Starting test testAppendMultiThreads");
359    // run a with mixed column families (1 and 3 versions)
360    initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2);
361
362    int numThreads = 100;
363    int opsPerThread = 100;
364    AtomicOperation[] all = new AtomicOperation[numThreads];
365    final byte[] val = new byte[] { 1 };
366
367    AtomicInteger failures = new AtomicInteger(0);
368    // create all threads
369    for (int i = 0; i < numThreads; i++) {
370      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
371        @Override
372        public void run() {
373          for (int i = 0; i < numOps; i++) {
374            try {
375              Append a = new Append(row);
376              a.addColumn(fam1, qual1, val);
377              a.addColumn(fam1, qual2, val);
378              a.addColumn(fam2, qual3, val);
379              a.setDurability(Durability.ASYNC_WAL);
380              region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
381
382              Get g = new Get(row);
383              Result result = region.get(g);
384              assertEquals(result.getValue(fam1, qual1).length,
385                result.getValue(fam1, qual2).length);
386              assertEquals(result.getValue(fam1, qual1).length,
387                result.getValue(fam2, qual3).length);
388            } catch (IOException e) {
389              e.printStackTrace();
390              failures.incrementAndGet();
391              fail();
392            }
393          }
394        }
395      };
396    }
397
398    // run all threads
399    for (int i = 0; i < numThreads; i++) {
400      all[i].start();
401    }
402
403    // wait for all threads to finish
404    for (int i = 0; i < numThreads; i++) {
405      try {
406        all[i].join();
407      } catch (InterruptedException e) {
408      }
409    }
410    assertEquals(0, failures.get());
411    Get g = new Get(row);
412    Result result = region.get(g);
413    assertEquals(10000, result.getValue(fam1, qual1).length);
414    assertEquals(10000, result.getValue(fam1, qual2).length);
415    assertEquals(10000, result.getValue(fam2, qual3).length);
416  }
417
418  /**
419   * Test multi-threaded row mutations.
420   */
421  @Test
422  public void testRowMutationMultiThreads() throws IOException {
423    LOG.info("Starting test testRowMutationMultiThreads");
424    initHRegion(tableName, name.getMethodName(), fam1);
425
426    // create 10 threads, each will alternate between adding and
427    // removing a column
428    int numThreads = 10;
429    int opsPerThread = 250;
430    AtomicOperation[] all = new AtomicOperation[numThreads];
431
432    AtomicLong timeStamps = new AtomicLong(0);
433    AtomicInteger failures = new AtomicInteger(0);
434    // create all threads
435    for (int i = 0; i < numThreads; i++) {
436      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
437        @Override
438        public void run() {
439          boolean op = true;
440          for (int i = 0; i < numOps; i++) {
441            try {
442              // throw in some flushes
443              if (i % 10 == 0) {
444                synchronized (region) {
445                  LOG.debug("flushing");
446                  region.flush(true);
447                  if (i % 100 == 0) {
448                    region.compact(false);
449                  }
450                }
451              }
452              long ts = timeStamps.incrementAndGet();
453              RowMutations rm = new RowMutations(row);
454              if (op) {
455                Put p = new Put(row, ts);
456                p.addColumn(fam1, qual1, value1);
457                p.setDurability(Durability.ASYNC_WAL);
458                rm.add(p);
459                Delete d = new Delete(row);
460                d.addColumns(fam1, qual2, ts);
461                d.setDurability(Durability.ASYNC_WAL);
462                rm.add(d);
463              } else {
464                Delete d = new Delete(row);
465                d.addColumns(fam1, qual1, ts);
466                d.setDurability(Durability.ASYNC_WAL);
467                rm.add(d);
468                Put p = new Put(row, ts);
469                p.addColumn(fam1, qual2, value2);
470                p.setDurability(Durability.ASYNC_WAL);
471                rm.add(p);
472              }
473              region.mutateRow(rm);
474              op ^= true;
475              // check: should always see exactly one column
476              Get g = new Get(row);
477              Result r = region.get(g);
478              if (r.size() != 1) {
479                LOG.debug(Objects.toString(r));
480                failures.incrementAndGet();
481                fail();
482              }
483            } catch (IOException e) {
484              e.printStackTrace();
485              failures.incrementAndGet();
486              fail();
487            }
488          }
489        }
490      };
491    }
492
493    // run all threads
494    for (int i = 0; i < numThreads; i++) {
495      all[i].start();
496    }
497
498    // wait for all threads to finish
499    for (int i = 0; i < numThreads; i++) {
500      try {
501        all[i].join();
502      } catch (InterruptedException e) {
503      }
504    }
505    assertEquals(0, failures.get());
506  }
507
508  /**
509   * Test multi-threaded region mutations.
510   */
511  @Test
512  public void testMultiRowMutationMultiThreads() throws IOException {
513
514    LOG.info("Starting test testMultiRowMutationMultiThreads");
515    initHRegion(tableName, name.getMethodName(), fam1);
516
517    // create 10 threads, each will alternate between adding and
518    // removing a column
519    int numThreads = 10;
520    int opsPerThread = 250;
521    AtomicOperation[] all = new AtomicOperation[numThreads];
522
523    AtomicLong timeStamps = new AtomicLong(0);
524    AtomicInteger failures = new AtomicInteger(0);
525    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
526    // create all threads
527    for (int i = 0; i < numThreads; i++) {
528      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
529        @Override
530        public void run() {
531          boolean op = true;
532          for (int i = 0; i < numOps; i++) {
533            try {
534              // throw in some flushes
535              if (i % 10 == 0) {
536                synchronized (region) {
537                  LOG.debug("flushing");
538                  region.flush(true);
539                  if (i % 100 == 0) {
540                    region.compact(false);
541                  }
542                }
543              }
544              long ts = timeStamps.incrementAndGet();
545              List<Mutation> mrm = new ArrayList<>();
546              if (op) {
547                Put p = new Put(row2, ts);
548                p.addColumn(fam1, qual1, value1);
549                p.setDurability(Durability.ASYNC_WAL);
550                mrm.add(p);
551                Delete d = new Delete(row);
552                d.addColumns(fam1, qual1, ts);
553                d.setDurability(Durability.ASYNC_WAL);
554                mrm.add(d);
555              } else {
556                Delete d = new Delete(row2);
557                d.addColumns(fam1, qual1, ts);
558                d.setDurability(Durability.ASYNC_WAL);
559                mrm.add(d);
560                Put p = new Put(row, ts);
561                p.setDurability(Durability.ASYNC_WAL);
562                p.addColumn(fam1, qual1, value2);
563                mrm.add(p);
564              }
565              region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
566              op ^= true;
567              // check: should always see exactly one column
568              Scan s = new Scan().withStartRow(row);
569              RegionScanner rs = region.getScanner(s);
570              List<Cell> r = new ArrayList<>();
571              while (rs.next(r))
572                ;
573              rs.close();
574              if (r.size() != 1) {
575                LOG.debug(Objects.toString(r));
576                failures.incrementAndGet();
577                fail();
578              }
579            } catch (IOException e) {
580              e.printStackTrace();
581              failures.incrementAndGet();
582              fail();
583            }
584          }
585        }
586      };
587    }
588
589    // run all threads
590    for (int i = 0; i < numThreads; i++) {
591      all[i].start();
592    }
593
594    // wait for all threads to finish
595    for (int i = 0; i < numThreads; i++) {
596      try {
597        all[i].join();
598      } catch (InterruptedException e) {
599      }
600    }
601    assertEquals(0, failures.get());
602  }
603
604  public static class AtomicOperation extends Thread {
605    protected final HRegion region;
606    protected final int numOps;
607    protected final AtomicLong timeStamps;
608    protected final AtomicInteger failures;
609
610    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
611      AtomicInteger failures) {
612      this.region = region;
613      this.numOps = numOps;
614      this.timeStamps = timeStamps;
615      this.failures = failures;
616    }
617  }
618
619  private static CountDownLatch latch = new CountDownLatch(1);
620
621  private enum TestStep {
622    INIT, // initial put of 10 to set value of the cell
623    PUT_STARTED, // began doing a put of 50 to cell
624    PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
625    CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
626    CHECKANDPUT_COMPLETED // completed checkAndPut
627    // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
628  }
629
630  private static volatile TestStep testStep = TestStep.INIT;
631  private final String family = "f1";
632
633  /**
634   * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into
635   * TestAtomicOperation from its original location, TestHBase7051
636   */
637  @Test
638  public void testPutAndCheckAndPutInParallel() throws Exception {
639    Configuration conf = TEST_UTIL.getConfiguration();
640    conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
641    TableDescriptorBuilder tableDescriptorBuilder =
642      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()));
643    ColumnFamilyDescriptor columnFamilyDescriptor =
644      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();
645    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
646    this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null);
647    Put[] puts = new Put[1];
648    Put put = new Put(Bytes.toBytes("r1"));
649    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
650    puts[0] = put;
651
652    region.batchMutate(puts);
653    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
654    ctx.addThread(new PutThread(ctx, region));
655    ctx.addThread(new CheckAndPutThread(ctx, region));
656    ctx.startThreads();
657    while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
658      Thread.sleep(100);
659    }
660    ctx.stop();
661    Scan s = new Scan();
662    RegionScanner scanner = region.getScanner(s);
663    List<Cell> results = new ArrayList<>();
664    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
665    scanner.next(results, scannerContext);
666    for (Cell keyValue : results) {
667      assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue)));
668    }
669  }
670
671  private class PutThread extends TestThread {
672    private Region region;
673
674    PutThread(TestContext ctx, Region region) {
675      super(ctx);
676      this.region = region;
677    }
678
679    @Override
680    public void doWork() throws Exception {
681      Put[] puts = new Put[1];
682      Put put = new Put(Bytes.toBytes("r1"));
683      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
684      puts[0] = put;
685      testStep = TestStep.PUT_STARTED;
686      region.batchMutate(puts);
687    }
688  }
689
690  private class CheckAndPutThread extends TestThread {
691    private Region region;
692
693    CheckAndPutThread(TestContext ctx, Region region) {
694      super(ctx);
695      this.region = region;
696    }
697
698    @Override
699    public void doWork() throws Exception {
700      Put[] puts = new Put[1];
701      Put put = new Put(Bytes.toBytes("r1"));
702      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
703      puts[0] = put;
704      while (testStep != TestStep.PUT_COMPLETED) {
705        Thread.sleep(100);
706      }
707      testStep = TestStep.CHECKANDPUT_STARTED;
708      region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
709        CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
710      testStep = TestStep.CHECKANDPUT_COMPLETED;
711    }
712  }
713
714  public static class MockHRegion extends HRegion {
715
716    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
717      final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
718      super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
719    }
720
721    @Override
722    protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
723      final RowLock prevRowlock) throws IOException {
724      if (testStep == TestStep.CHECKANDPUT_STARTED) {
725        latch.countDown();
726      }
727      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
728    }
729
730    public class WrappedRowLock implements RowLock {
731
732      private final RowLock rowLock;
733
734      private WrappedRowLock(RowLock rowLock) {
735        this.rowLock = rowLock;
736      }
737
738      @Override
739      public void release() {
740        if (testStep == TestStep.INIT) {
741          this.rowLock.release();
742          return;
743        }
744
745        if (testStep == TestStep.PUT_STARTED) {
746          try {
747            testStep = TestStep.PUT_COMPLETED;
748            this.rowLock.release();
749            // put has been written to the memstore and the row lock has been released, but the
750            // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
751            // operations would cause the non-atomicity to show up:
752            // 1) Put releases row lock (where we are now)
753            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
754            // because the MVCC has not advanced
755            // 3) Put advances MVCC
756            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
757            // (see below), and then wait some more to give the checkAndPut time to read the old
758            // value.
759            latch.await();
760            Thread.sleep(1000);
761          } catch (InterruptedException e) {
762            Thread.currentThread().interrupt();
763          }
764        } else if (testStep == TestStep.CHECKANDPUT_STARTED) {
765          this.rowLock.release();
766        }
767      }
768    }
769  }
770}