001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.Objects;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.CompareOperator;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionInfo;
047import org.apache.hadoop.hbase.HTableDescriptor;
048import org.apache.hadoop.hbase.MultithreadedTestUtil;
049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
050import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.Append;
053import org.apache.hadoop.hbase.client.Delete;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Get;
056import org.apache.hadoop.hbase.client.Increment;
057import org.apache.hadoop.hbase.client.IsolationLevel;
058import org.apache.hadoop.hbase.client.Mutation;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.Result;
062import org.apache.hadoop.hbase.client.RowMutations;
063import org.apache.hadoop.hbase.client.Scan;
064import org.apache.hadoop.hbase.client.TableDescriptor;
065import org.apache.hadoop.hbase.filter.BinaryComparator;
066import org.apache.hadoop.hbase.io.HeapSize;
067import org.apache.hadoop.hbase.io.hfile.BlockCache;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.junit.After;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083/**
084 * Testing of HRegion.incrementColumnValue, HRegion.increment,
085 * and HRegion.append
086 */
087@Category({VerySlowRegionServerTests.class, LargeTests.class}) // Starts 100 threads
088public class TestAtomicOperation {
089
090  @ClassRule
091  public static final HBaseClassTestRule CLASS_RULE =
092      HBaseClassTestRule.forClass(TestAtomicOperation.class);
093
094  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
095  @Rule public TestName name = new TestName();
096
097  HRegion region = null;
098  private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
099
100  // Test names
101  static  byte[] tableName;
102  static final byte[] qual1 = Bytes.toBytes("qual1");
103  static final byte[] qual2 = Bytes.toBytes("qual2");
104  static final byte[] qual3 = Bytes.toBytes("qual3");
105  static final byte[] value1 = Bytes.toBytes("value1");
106  static final byte[] value2 = Bytes.toBytes("value2");
107  static final byte [] row = Bytes.toBytes("rowA");
108  static final byte [] row2 = Bytes.toBytes("rowB");
109
110  @Before
111  public void setup() {
112    tableName = Bytes.toBytes(name.getMethodName());
113  }
114
115  @After
116  public void teardown() throws IOException {
117    if (region != null) {
118      CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
119      region.close();
120      WAL wal = region.getWAL();
121      if (wal != null) {
122        wal.close();
123      }
124      cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
125      region = null;
126    }
127  }
128
129  //////////////////////////////////////////////////////////////////////////////
130  // New tests that doesn't spin up a mini cluster but rather just test the
131  // individual code pieces in the HRegion.
132  //////////////////////////////////////////////////////////////////////////////
133
134  /**
135   * Test basic append operation.
136   * More tests in
137   * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
138   */
139  @Test
140  public void testAppend() throws IOException {
141    initHRegion(tableName, name.getMethodName(), fam1);
142    String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
143    " The Universe, and Everything";
144    String v2 = " is... 42.";
145    Append a = new Append(row);
146    a.setReturnResults(false);
147    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
148    a.addColumn(fam1, qual2, Bytes.toBytes(v2));
149    assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
150    a = new Append(row);
151    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
152    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
153    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
154    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
155    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
156  }
157
158  @Test
159  public void testAppendWithMultipleFamilies() throws IOException {
160    final byte[] fam3 = Bytes.toBytes("colfamily31");
161    initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3);
162    String v1 = "Appended";
163    String v2 = "Value";
164
165    Append a = new Append(row);
166    a.setReturnResults(false);
167    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
168    a.addColumn(fam2, qual2, Bytes.toBytes(v2));
169    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
170    assertTrue("Expected an empty result but result contains " + result.size() + " keys",
171      result.isEmpty());
172
173    a = new Append(row);
174    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
175    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
176    a.addColumn(fam3, qual3, Bytes.toBytes(v2));
177    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
178
179    result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
180
181    byte[] actualValue1 = result.getValue(fam1, qual1);
182    byte[] actualValue2 = result.getValue(fam2, qual2);
183    byte[] actualValue3 = result.getValue(fam3, qual3);
184    byte[] actualValue4 = result.getValue(fam1, qual2);
185
186    assertNotNull("Value1 should bot be null", actualValue1);
187    assertNotNull("Value2 should bot be null", actualValue2);
188    assertNotNull("Value3 should bot be null", actualValue3);
189    assertNotNull("Value4 should bot be null", actualValue4);
190    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
191    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
192    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
193    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
194  }
195
196  @Test
197  public void testAppendWithNonExistingFamily() throws IOException {
198    initHRegion(tableName, name.getMethodName(), fam1);
199    final String v1 = "Value";
200    final Append a = new Append(row);
201    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
202    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
203    Result result = null;
204    try {
205      result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
206      fail("Append operation should fail with NoSuchColumnFamilyException.");
207    } catch (NoSuchColumnFamilyException e) {
208      assertEquals(null, result);
209    } catch (Exception e) {
210      fail("Append operation should fail with NoSuchColumnFamilyException.");
211    }
212  }
213
214  @Test
215  public void testIncrementWithNonExistingFamily() throws IOException {
216    initHRegion(tableName, name.getMethodName(), fam1);
217    final Increment inc = new Increment(row);
218    inc.addColumn(fam1, qual1, 1);
219    inc.addColumn(fam2, qual2, 1);
220    inc.setDurability(Durability.ASYNC_WAL);
221    try {
222      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
223    } catch (NoSuchColumnFamilyException e) {
224      final Get g = new Get(row);
225      final Result result = region.get(g);
226      assertEquals(null, result.getValue(fam1, qual1));
227      assertEquals(null, result.getValue(fam2, qual2));
228    } catch (Exception e) {
229      fail("Increment operation should fail with NoSuchColumnFamilyException.");
230    }
231  }
232
233  /**
234   * Test multi-threaded increments.
235   */
236  @Test
237  public void testIncrementMultiThreads() throws IOException {
238    boolean fast = true;
239    LOG.info("Starting test testIncrementMultiThreads");
240    // run a with mixed column families (1 and 3 versions)
241    initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
242
243    // Create 100 threads, each will increment by its own quantity. All 100 threads update the
244    // same row over two column families.
245    int numThreads = 100;
246    int incrementsPerThread = 1000;
247    Incrementer[] all = new Incrementer[numThreads];
248    int expectedTotal = 0;
249    // create all threads
250    for (int i = 0; i < numThreads; i++) {
251      all[i] = new Incrementer(region, i, i, incrementsPerThread);
252      expectedTotal += (i * incrementsPerThread);
253    }
254
255    // run all threads
256    for (int i = 0; i < numThreads; i++) {
257      all[i].start();
258    }
259
260    // wait for all threads to finish
261    for (int i = 0; i < numThreads; i++) {
262      try {
263        all[i].join();
264      } catch (InterruptedException e) {
265        LOG.info("Ignored", e);
266      }
267    }
268    assertICV(row, fam1, qual1, expectedTotal, fast);
269    assertICV(row, fam1, qual2, expectedTotal*2, fast);
270    assertICV(row, fam2, qual3, expectedTotal*3, fast);
271    LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
272  }
273
274
275  private void assertICV(byte [] row,
276                         byte [] familiy,
277                         byte[] qualifier,
278                         long amount,
279                         boolean fast) throws IOException {
280    // run a get and see?
281    Get get = new Get(row);
282    if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
283    get.addColumn(familiy, qualifier);
284    Result result = region.get(get);
285    assertEquals(1, result.size());
286
287    Cell kv = result.rawCells()[0];
288    long r = Bytes.toLong(CellUtil.cloneValue(kv));
289    assertEquals(amount, r);
290  }
291
292  private void initHRegion (byte [] tableName, String callingMethod,
293      byte[] ... families)
294    throws IOException {
295    initHRegion(tableName, callingMethod, null, families);
296  }
297
298  private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
299    byte[] ... families)
300  throws IOException {
301    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
302    int i=0;
303    for(byte [] family : families) {
304      HColumnDescriptor hcd = new HColumnDescriptor(family);
305      hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
306      htd.addFamily(hcd);
307    }
308    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
309    region = TEST_UTIL.createLocalHRegion(info, htd);
310  }
311
312  /**
313   * A thread that makes increment calls always on the same row, this.row against two column
314   * families on this row.
315   */
316  public static class Incrementer extends Thread {
317
318    private final Region region;
319    private final int numIncrements;
320    private final int amount;
321
322
323    public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
324      super("Incrementer." + threadNumber);
325      this.region = region;
326      this.numIncrements = numIncrements;
327      this.amount = amount;
328      setDaemon(true);
329    }
330
331    @Override
332    public void run() {
333      for (int i = 0; i < numIncrements; i++) {
334        try {
335          Increment inc = new Increment(row);
336          inc.addColumn(fam1, qual1, amount);
337          inc.addColumn(fam1, qual2, amount*2);
338          inc.addColumn(fam2, qual3, amount*3);
339          inc.setDurability(Durability.ASYNC_WAL);
340          Result result = region.increment(inc);
341          if (result != null) {
342            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
343              Bytes.toLong(result.getValue(fam1, qual2)));
344            assertTrue(result.getValue(fam2, qual3) != null);
345            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
346              Bytes.toLong(result.getValue(fam2, qual3)));
347            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
348               Bytes.toLong(result.getValue(fam1, qual2)));
349            long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
350            long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
351            assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
352              fam1Increment, fam2Increment);
353          }
354        } catch (IOException e) {
355          e.printStackTrace();
356        }
357      }
358    }
359  }
360
361  @Test
362  public void testAppendMultiThreads() throws IOException {
363    LOG.info("Starting test testAppendMultiThreads");
364    // run a with mixed column families (1 and 3 versions)
365    initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
366
367    int numThreads = 100;
368    int opsPerThread = 100;
369    AtomicOperation[] all = new AtomicOperation[numThreads];
370    final byte[] val = new byte[]{1};
371
372    AtomicInteger failures = new AtomicInteger(0);
373    // create all threads
374    for (int i = 0; i < numThreads; i++) {
375      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
376        @Override
377        public void run() {
378          for (int i=0; i<numOps; i++) {
379            try {
380              Append a = new Append(row);
381              a.addColumn(fam1, qual1, val);
382              a.addColumn(fam1, qual2, val);
383              a.addColumn(fam2, qual3, val);
384              a.setDurability(Durability.ASYNC_WAL);
385              region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
386
387              Get g = new Get(row);
388              Result result = region.get(g);
389              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
390              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
391            } catch (IOException e) {
392              e.printStackTrace();
393              failures.incrementAndGet();
394              fail();
395            }
396          }
397        }
398      };
399    }
400
401    // run all threads
402    for (int i = 0; i < numThreads; i++) {
403      all[i].start();
404    }
405
406    // wait for all threads to finish
407    for (int i = 0; i < numThreads; i++) {
408      try {
409        all[i].join();
410      } catch (InterruptedException e) {
411      }
412    }
413    assertEquals(0, failures.get());
414    Get g = new Get(row);
415    Result result = region.get(g);
416    assertEquals(10000, result.getValue(fam1, qual1).length);
417    assertEquals(10000, result.getValue(fam1, qual2).length);
418    assertEquals(10000, result.getValue(fam2, qual3).length);
419  }
420  /**
421   * Test multi-threaded row mutations.
422   */
423  @Test
424  public void testRowMutationMultiThreads() throws IOException {
425    LOG.info("Starting test testRowMutationMultiThreads");
426    initHRegion(tableName, name.getMethodName(), fam1);
427
428    // create 10 threads, each will alternate between adding and
429    // removing a column
430    int numThreads = 10;
431    int opsPerThread = 250;
432    AtomicOperation[] all = new AtomicOperation[numThreads];
433
434    AtomicLong timeStamps = new AtomicLong(0);
435    AtomicInteger failures = new AtomicInteger(0);
436    // create all threads
437    for (int i = 0; i < numThreads; i++) {
438      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
439        @Override
440        public void run() {
441          boolean op = true;
442          for (int i=0; i<numOps; i++) {
443            try {
444              // throw in some flushes
445              if (i%10==0) {
446                synchronized(region) {
447                  LOG.debug("flushing");
448                  region.flush(true);
449                  if (i%100==0) {
450                    region.compact(false);
451                  }
452                }
453              }
454              long ts = timeStamps.incrementAndGet();
455              RowMutations rm = new RowMutations(row);
456              if (op) {
457                Put p = new Put(row, ts);
458                p.addColumn(fam1, qual1, value1);
459                p.setDurability(Durability.ASYNC_WAL);
460                rm.add(p);
461                Delete d = new Delete(row);
462                d.addColumns(fam1, qual2, ts);
463                d.setDurability(Durability.ASYNC_WAL);
464                rm.add(d);
465              } else {
466                Delete d = new Delete(row);
467                d.addColumns(fam1, qual1, ts);
468                d.setDurability(Durability.ASYNC_WAL);
469                rm.add(d);
470                Put p = new Put(row, ts);
471                p.addColumn(fam1, qual2, value2);
472                p.setDurability(Durability.ASYNC_WAL);
473                rm.add(p);
474              }
475              region.mutateRow(rm);
476              op ^= true;
477              // check: should always see exactly one column
478              Get g = new Get(row);
479              Result r = region.get(g);
480              if (r.size() != 1) {
481                LOG.debug(Objects.toString(r));
482                failures.incrementAndGet();
483                fail();
484              }
485            } catch (IOException e) {
486              e.printStackTrace();
487              failures.incrementAndGet();
488              fail();
489            }
490          }
491        }
492      };
493    }
494
495    // run all threads
496    for (int i = 0; i < numThreads; i++) {
497      all[i].start();
498    }
499
500    // wait for all threads to finish
501    for (int i = 0; i < numThreads; i++) {
502      try {
503        all[i].join();
504      } catch (InterruptedException e) {
505      }
506    }
507    assertEquals(0, failures.get());
508  }
509
510
511  /**
512   * Test multi-threaded region mutations.
513   */
514  @Test
515  public void testMultiRowMutationMultiThreads() throws IOException {
516
517    LOG.info("Starting test testMultiRowMutationMultiThreads");
518    initHRegion(tableName, name.getMethodName(), fam1);
519
520    // create 10 threads, each will alternate between adding and
521    // removing a column
522    int numThreads = 10;
523    int opsPerThread = 250;
524    AtomicOperation[] all = new AtomicOperation[numThreads];
525
526    AtomicLong timeStamps = new AtomicLong(0);
527    AtomicInteger failures = new AtomicInteger(0);
528    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
529    // create all threads
530    for (int i = 0; i < numThreads; i++) {
531      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
532        @Override
533        public void run() {
534          boolean op = true;
535          for (int i=0; i<numOps; i++) {
536            try {
537              // throw in some flushes
538              if (i%10==0) {
539                synchronized(region) {
540                  LOG.debug("flushing");
541                  region.flush(true);
542                  if (i%100==0) {
543                    region.compact(false);
544                  }
545                }
546              }
547              long ts = timeStamps.incrementAndGet();
548              List<Mutation> mrm = new ArrayList<>();
549              if (op) {
550                Put p = new Put(row2, ts);
551                p.addColumn(fam1, qual1, value1);
552                p.setDurability(Durability.ASYNC_WAL);
553                mrm.add(p);
554                Delete d = new Delete(row);
555                d.addColumns(fam1, qual1, ts);
556                d.setDurability(Durability.ASYNC_WAL);
557                mrm.add(d);
558              } else {
559                Delete d = new Delete(row2);
560                d.addColumns(fam1, qual1, ts);
561                d.setDurability(Durability.ASYNC_WAL);
562                mrm.add(d);
563                Put p = new Put(row, ts);
564                p.setDurability(Durability.ASYNC_WAL);
565                p.addColumn(fam1, qual1, value2);
566                mrm.add(p);
567              }
568              region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
569              op ^= true;
570              // check: should always see exactly one column
571              Scan s = new Scan(row);
572              RegionScanner rs = region.getScanner(s);
573              List<Cell> r = new ArrayList<>();
574              while (rs.next(r))
575                ;
576              rs.close();
577              if (r.size() != 1) {
578                LOG.debug(Objects.toString(r));
579                failures.incrementAndGet();
580                fail();
581              }
582            } catch (IOException e) {
583              e.printStackTrace();
584              failures.incrementAndGet();
585              fail();
586            }
587          }
588        }
589      };
590    }
591
592    // run all threads
593    for (int i = 0; i < numThreads; i++) {
594      all[i].start();
595    }
596
597    // wait for all threads to finish
598    for (int i = 0; i < numThreads; i++) {
599      try {
600        all[i].join();
601      } catch (InterruptedException e) {
602      }
603    }
604    assertEquals(0, failures.get());
605  }
606
607  public static class AtomicOperation extends Thread {
608    protected final HRegion region;
609    protected final int numOps;
610    protected final AtomicLong timeStamps;
611    protected final AtomicInteger failures;
612    protected final Random r = new Random();
613
614    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
615        AtomicInteger failures) {
616      this.region = region;
617      this.numOps = numOps;
618      this.timeStamps = timeStamps;
619      this.failures = failures;
620    }
621  }
622
623  private static CountDownLatch latch = new CountDownLatch(1);
624  private enum TestStep {
625    INIT,                  // initial put of 10 to set value of the cell
626    PUT_STARTED,           // began doing a put of 50 to cell
627    PUT_COMPLETED,         // put complete (released RowLock, but may not have advanced MVCC).
628    CHECKANDPUT_STARTED,   // began checkAndPut: if 10 -> 11
629    CHECKANDPUT_COMPLETED  // completed checkAndPut
630    // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
631  }
632  private static volatile TestStep testStep = TestStep.INIT;
633  private final String family = "f1";
634
635  /**
636   * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
637   * MVCC.
638   *
639   * Moved into TestAtomicOperation from its original location, TestHBase7051
640   */
641  @Test
642  public void testPutAndCheckAndPutInParallel() throws Exception {
643    Configuration conf = TEST_UTIL.getConfiguration();
644    conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
645    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
646        .addFamily(new HColumnDescriptor(family));
647    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
648    Put[] puts = new Put[1];
649    Put put = new Put(Bytes.toBytes("r1"));
650    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
651    puts[0] = put;
652
653    region.batchMutate(puts);
654    MultithreadedTestUtil.TestContext ctx =
655      new MultithreadedTestUtil.TestContext(conf);
656    ctx.addThread(new PutThread(ctx, region));
657    ctx.addThread(new CheckAndPutThread(ctx, region));
658    ctx.startThreads();
659    while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
660      Thread.sleep(100);
661    }
662    ctx.stop();
663    Scan s = new Scan();
664    RegionScanner scanner = region.getScanner(s);
665    List<Cell> results = new ArrayList<>();
666    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
667    scanner.next(results, scannerContext);
668    for (Cell keyValue : results) {
669      assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
670    }
671  }
672
673  private class PutThread extends TestThread {
674    private Region region;
675    PutThread(TestContext ctx, Region region) {
676      super(ctx);
677      this.region = region;
678    }
679
680    @Override
681    public void doWork() throws Exception {
682      Put[] puts = new Put[1];
683      Put put = new Put(Bytes.toBytes("r1"));
684      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
685      puts[0] = put;
686      testStep = TestStep.PUT_STARTED;
687      region.batchMutate(puts);
688    }
689  }
690
691  private class CheckAndPutThread extends TestThread {
692    private Region region;
693    CheckAndPutThread(TestContext ctx, Region region) {
694      super(ctx);
695      this.region = region;
696   }
697
698    @Override
699    public void doWork() throws Exception {
700      Put[] puts = new Put[1];
701      Put put = new Put(Bytes.toBytes("r1"));
702      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
703      puts[0] = put;
704      while (testStep != TestStep.PUT_COMPLETED) {
705        Thread.sleep(100);
706      }
707      testStep = TestStep.CHECKANDPUT_STARTED;
708      region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
709        CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
710      testStep = TestStep.CHECKANDPUT_COMPLETED;
711    }
712  }
713
714  public static class MockHRegion extends HRegion {
715
716    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
717        final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
718      super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
719    }
720
721    @Override
722    public RowLock getRowLockInternal(final byte[] row, boolean readLock,
723        final RowLock prevRowlock) throws IOException {
724      if (testStep == TestStep.CHECKANDPUT_STARTED) {
725        latch.countDown();
726      }
727      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
728    }
729
730    public class WrappedRowLock implements RowLock {
731
732      private final RowLock rowLock;
733
734      private WrappedRowLock(RowLock rowLock) {
735        this.rowLock = rowLock;
736      }
737
738
739      @Override
740      public void release() {
741        if (testStep == TestStep.INIT) {
742          this.rowLock.release();
743          return;
744        }
745
746        if (testStep == TestStep.PUT_STARTED) {
747          try {
748            testStep = TestStep.PUT_COMPLETED;
749            this.rowLock.release();
750            // put has been written to the memstore and the row lock has been released, but the
751            // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
752            // operations would cause the non-atomicity to show up:
753            // 1) Put releases row lock (where we are now)
754            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
755            //    because the MVCC has not advanced
756            // 3) Put advances MVCC
757            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
758            // (see below), and then wait some more to give the checkAndPut time to read the old
759            // value.
760            latch.await();
761            Thread.sleep(1000);
762          } catch (InterruptedException e) {
763            Thread.currentThread().interrupt();
764          }
765        }
766        else if (testStep == TestStep.CHECKANDPUT_STARTED) {
767          this.rowLock.release();
768        }
769      }
770    }
771  }
772}