001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.Objects;
031import java.util.Random;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.MultithreadedTestUtil;
048import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.Append;
052import org.apache.hadoop.hbase.client.Delete;
053import org.apache.hadoop.hbase.client.Durability;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Increment;
056import org.apache.hadoop.hbase.client.IsolationLevel;
057import org.apache.hadoop.hbase.client.Mutation;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.Result;
061import org.apache.hadoop.hbase.client.RowMutations;
062import org.apache.hadoop.hbase.client.Scan;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.filter.BinaryComparator;
065import org.apache.hadoop.hbase.io.HeapSize;
066import org.apache.hadoop.hbase.io.hfile.BlockCache;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.wal.WAL;
071import org.junit.After;
072import org.junit.Before;
073import org.junit.ClassRule;
074import org.junit.Rule;
075import org.junit.Test;
076import org.junit.experimental.categories.Category;
077import org.junit.rules.TestName;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081/**
082 * Testing of HRegion.incrementColumnValue, HRegion.increment,
083 * and HRegion.append
084 */
085@Category({VerySlowRegionServerTests.class, MediumTests.class}) // Starts 100 threads
086public class TestAtomicOperation {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090      HBaseClassTestRule.forClass(TestAtomicOperation.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
093  @Rule public TestName name = new TestName();
094
095  HRegion region = null;
096  private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
097
098  // Test names
099  static  byte[] tableName;
100  static final byte[] qual1 = Bytes.toBytes("qual1");
101  static final byte[] qual2 = Bytes.toBytes("qual2");
102  static final byte[] qual3 = Bytes.toBytes("qual3");
103  static final byte[] value1 = Bytes.toBytes("value1");
104  static final byte[] value2 = Bytes.toBytes("value2");
105  static final byte [] row = Bytes.toBytes("rowA");
106  static final byte [] row2 = Bytes.toBytes("rowB");
107
108  @Before
109  public void setup() {
110    tableName = Bytes.toBytes(name.getMethodName());
111  }
112
113  @After
114  public void teardown() throws IOException {
115    if (region != null) {
116      BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
117      region.close();
118      WAL wal = region.getWAL();
119      if (wal != null) wal.close();
120      if (bc != null) bc.shutdown();
121      region = null;
122    }
123  }
124  //////////////////////////////////////////////////////////////////////////////
125  // New tests that doesn't spin up a mini cluster but rather just test the
126  // individual code pieces in the HRegion.
127  //////////////////////////////////////////////////////////////////////////////
128
129  /**
130   * Test basic append operation.
131   * More tests in
132   * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
133   */
134  @Test
135  public void testAppend() throws IOException {
136    initHRegion(tableName, name.getMethodName(), fam1);
137    String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
138    " The Universe, and Everything";
139    String v2 = " is... 42.";
140    Append a = new Append(row);
141    a.setReturnResults(false);
142    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
143    a.addColumn(fam1, qual2, Bytes.toBytes(v2));
144    assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
145    a = new Append(row);
146    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
147    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
148    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
149    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
150    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
151  }
152
153  @Test
154  public void testAppendWithNonExistingFamily() throws IOException {
155    initHRegion(tableName, name.getMethodName(), fam1);
156    final String v1 = "Value";
157    final Append a = new Append(row);
158    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
159    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
160    Result result = null;
161    try {
162      result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
163      fail("Append operation should fail with NoSuchColumnFamilyException.");
164    } catch (NoSuchColumnFamilyException e) {
165      assertEquals(null, result);
166    } catch (Exception e) {
167      fail("Append operation should fail with NoSuchColumnFamilyException.");
168    }
169  }
170
171  @Test
172  public void testIncrementWithNonExistingFamily() throws IOException {
173    initHRegion(tableName, name.getMethodName(), fam1);
174    final Increment inc = new Increment(row);
175    inc.addColumn(fam1, qual1, 1);
176    inc.addColumn(fam2, qual2, 1);
177    inc.setDurability(Durability.ASYNC_WAL);
178    try {
179      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
180    } catch (NoSuchColumnFamilyException e) {
181      final Get g = new Get(row);
182      final Result result = region.get(g);
183      assertEquals(null, result.getValue(fam1, qual1));
184      assertEquals(null, result.getValue(fam2, qual2));
185    } catch (Exception e) {
186      fail("Increment operation should fail with NoSuchColumnFamilyException.");
187    }
188  }
189
190  /**
191   * Test multi-threaded increments.
192   */
193  @Test
194  public void testIncrementMultiThreads() throws IOException {
195    boolean fast = true;
196    LOG.info("Starting test testIncrementMultiThreads");
197    // run a with mixed column families (1 and 3 versions)
198    initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
199
200    // Create 100 threads, each will increment by its own quantity. All 100 threads update the
201    // same row over two column families.
202    int numThreads = 100;
203    int incrementsPerThread = 1000;
204    Incrementer[] all = new Incrementer[numThreads];
205    int expectedTotal = 0;
206    // create all threads
207    for (int i = 0; i < numThreads; i++) {
208      all[i] = new Incrementer(region, i, i, incrementsPerThread);
209      expectedTotal += (i * incrementsPerThread);
210    }
211
212    // run all threads
213    for (int i = 0; i < numThreads; i++) {
214      all[i].start();
215    }
216
217    // wait for all threads to finish
218    for (int i = 0; i < numThreads; i++) {
219      try {
220        all[i].join();
221      } catch (InterruptedException e) {
222        LOG.info("Ignored", e);
223      }
224    }
225    assertICV(row, fam1, qual1, expectedTotal, fast);
226    assertICV(row, fam1, qual2, expectedTotal*2, fast);
227    assertICV(row, fam2, qual3, expectedTotal*3, fast);
228    LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
229  }
230
231
232  private void assertICV(byte [] row,
233                         byte [] familiy,
234                         byte[] qualifier,
235                         long amount,
236                         boolean fast) throws IOException {
237    // run a get and see?
238    Get get = new Get(row);
239    if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
240    get.addColumn(familiy, qualifier);
241    Result result = region.get(get);
242    assertEquals(1, result.size());
243
244    Cell kv = result.rawCells()[0];
245    long r = Bytes.toLong(CellUtil.cloneValue(kv));
246    assertEquals(amount, r);
247  }
248
249  private void initHRegion (byte [] tableName, String callingMethod,
250      byte[] ... families)
251    throws IOException {
252    initHRegion(tableName, callingMethod, null, families);
253  }
254
255  private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
256    byte[] ... families)
257  throws IOException {
258    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
259    int i=0;
260    for(byte [] family : families) {
261      HColumnDescriptor hcd = new HColumnDescriptor(family);
262      hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
263      htd.addFamily(hcd);
264    }
265    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
266    region = TEST_UTIL.createLocalHRegion(info, htd);
267  }
268
269  /**
270   * A thread that makes increment calls always on the same row, this.row against two column
271   * families on this row.
272   */
273  public static class Incrementer extends Thread {
274
275    private final Region region;
276    private final int numIncrements;
277    private final int amount;
278
279
280    public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
281      super("Incrementer." + threadNumber);
282      this.region = region;
283      this.numIncrements = numIncrements;
284      this.amount = amount;
285      setDaemon(true);
286    }
287
288    @Override
289    public void run() {
290      for (int i = 0; i < numIncrements; i++) {
291        try {
292          Increment inc = new Increment(row);
293          inc.addColumn(fam1, qual1, amount);
294          inc.addColumn(fam1, qual2, amount*2);
295          inc.addColumn(fam2, qual3, amount*3);
296          inc.setDurability(Durability.ASYNC_WAL);
297          Result result = region.increment(inc);
298          if (result != null) {
299            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
300              Bytes.toLong(result.getValue(fam1, qual2)));
301            assertTrue(result.getValue(fam2, qual3) != null);
302            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
303              Bytes.toLong(result.getValue(fam2, qual3)));
304            assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
305               Bytes.toLong(result.getValue(fam1, qual2)));
306            long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
307            long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
308            assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
309              fam1Increment, fam2Increment);
310          }
311        } catch (IOException e) {
312          e.printStackTrace();
313        }
314      }
315    }
316  }
317
318  @Test
319  public void testAppendMultiThreads() throws IOException {
320    LOG.info("Starting test testAppendMultiThreads");
321    // run a with mixed column families (1 and 3 versions)
322    initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
323
324    int numThreads = 100;
325    int opsPerThread = 100;
326    AtomicOperation[] all = new AtomicOperation[numThreads];
327    final byte[] val = new byte[]{1};
328
329    AtomicInteger failures = new AtomicInteger(0);
330    // create all threads
331    for (int i = 0; i < numThreads; i++) {
332      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
333        @Override
334        public void run() {
335          for (int i=0; i<numOps; i++) {
336            try {
337              Append a = new Append(row);
338              a.addColumn(fam1, qual1, val);
339              a.addColumn(fam1, qual2, val);
340              a.addColumn(fam2, qual3, val);
341              a.setDurability(Durability.ASYNC_WAL);
342              region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
343
344              Get g = new Get(row);
345              Result result = region.get(g);
346              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
347              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
348            } catch (IOException e) {
349              e.printStackTrace();
350              failures.incrementAndGet();
351              fail();
352            }
353          }
354        }
355      };
356    }
357
358    // run all threads
359    for (int i = 0; i < numThreads; i++) {
360      all[i].start();
361    }
362
363    // wait for all threads to finish
364    for (int i = 0; i < numThreads; i++) {
365      try {
366        all[i].join();
367      } catch (InterruptedException e) {
368      }
369    }
370    assertEquals(0, failures.get());
371    Get g = new Get(row);
372    Result result = region.get(g);
373    assertEquals(10000, result.getValue(fam1, qual1).length);
374    assertEquals(10000, result.getValue(fam1, qual2).length);
375    assertEquals(10000, result.getValue(fam2, qual3).length);
376  }
377  /**
378   * Test multi-threaded row mutations.
379   */
380  @Test
381  public void testRowMutationMultiThreads() throws IOException {
382    LOG.info("Starting test testRowMutationMultiThreads");
383    initHRegion(tableName, name.getMethodName(), fam1);
384
385    // create 10 threads, each will alternate between adding and
386    // removing a column
387    int numThreads = 10;
388    int opsPerThread = 250;
389    AtomicOperation[] all = new AtomicOperation[numThreads];
390
391    AtomicLong timeStamps = new AtomicLong(0);
392    AtomicInteger failures = new AtomicInteger(0);
393    // create all threads
394    for (int i = 0; i < numThreads; i++) {
395      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
396        @Override
397        public void run() {
398          boolean op = true;
399          for (int i=0; i<numOps; i++) {
400            try {
401              // throw in some flushes
402              if (i%10==0) {
403                synchronized(region) {
404                  LOG.debug("flushing");
405                  region.flush(true);
406                  if (i%100==0) {
407                    region.compact(false);
408                  }
409                }
410              }
411              long ts = timeStamps.incrementAndGet();
412              RowMutations rm = new RowMutations(row);
413              if (op) {
414                Put p = new Put(row, ts);
415                p.addColumn(fam1, qual1, value1);
416                p.setDurability(Durability.ASYNC_WAL);
417                rm.add(p);
418                Delete d = new Delete(row);
419                d.addColumns(fam1, qual2, ts);
420                d.setDurability(Durability.ASYNC_WAL);
421                rm.add(d);
422              } else {
423                Delete d = new Delete(row);
424                d.addColumns(fam1, qual1, ts);
425                d.setDurability(Durability.ASYNC_WAL);
426                rm.add(d);
427                Put p = new Put(row, ts);
428                p.addColumn(fam1, qual2, value2);
429                p.setDurability(Durability.ASYNC_WAL);
430                rm.add(p);
431              }
432              region.mutateRow(rm);
433              op ^= true;
434              // check: should always see exactly one column
435              Get g = new Get(row);
436              Result r = region.get(g);
437              if (r.size() != 1) {
438                LOG.debug(Objects.toString(r));
439                failures.incrementAndGet();
440                fail();
441              }
442            } catch (IOException e) {
443              e.printStackTrace();
444              failures.incrementAndGet();
445              fail();
446            }
447          }
448        }
449      };
450    }
451
452    // run all threads
453    for (int i = 0; i < numThreads; i++) {
454      all[i].start();
455    }
456
457    // wait for all threads to finish
458    for (int i = 0; i < numThreads; i++) {
459      try {
460        all[i].join();
461      } catch (InterruptedException e) {
462      }
463    }
464    assertEquals(0, failures.get());
465  }
466
467
468  /**
469   * Test multi-threaded region mutations.
470   */
471  @Test
472  public void testMultiRowMutationMultiThreads() throws IOException {
473
474    LOG.info("Starting test testMultiRowMutationMultiThreads");
475    initHRegion(tableName, name.getMethodName(), fam1);
476
477    // create 10 threads, each will alternate between adding and
478    // removing a column
479    int numThreads = 10;
480    int opsPerThread = 250;
481    AtomicOperation[] all = new AtomicOperation[numThreads];
482
483    AtomicLong timeStamps = new AtomicLong(0);
484    AtomicInteger failures = new AtomicInteger(0);
485    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
486    // create all threads
487    for (int i = 0; i < numThreads; i++) {
488      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
489        @Override
490        public void run() {
491          boolean op = true;
492          for (int i=0; i<numOps; i++) {
493            try {
494              // throw in some flushes
495              if (i%10==0) {
496                synchronized(region) {
497                  LOG.debug("flushing");
498                  region.flush(true);
499                  if (i%100==0) {
500                    region.compact(false);
501                  }
502                }
503              }
504              long ts = timeStamps.incrementAndGet();
505              List<Mutation> mrm = new ArrayList<>();
506              if (op) {
507                Put p = new Put(row2, ts);
508                p.addColumn(fam1, qual1, value1);
509                p.setDurability(Durability.ASYNC_WAL);
510                mrm.add(p);
511                Delete d = new Delete(row);
512                d.addColumns(fam1, qual1, ts);
513                d.setDurability(Durability.ASYNC_WAL);
514                mrm.add(d);
515              } else {
516                Delete d = new Delete(row2);
517                d.addColumns(fam1, qual1, ts);
518                d.setDurability(Durability.ASYNC_WAL);
519                mrm.add(d);
520                Put p = new Put(row, ts);
521                p.setDurability(Durability.ASYNC_WAL);
522                p.addColumn(fam1, qual1, value2);
523                mrm.add(p);
524              }
525              region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
526              op ^= true;
527              // check: should always see exactly one column
528              Scan s = new Scan(row);
529              RegionScanner rs = region.getScanner(s);
530              List<Cell> r = new ArrayList<>();
531              while (rs.next(r))
532                ;
533              rs.close();
534              if (r.size() != 1) {
535                LOG.debug(Objects.toString(r));
536                failures.incrementAndGet();
537                fail();
538              }
539            } catch (IOException e) {
540              e.printStackTrace();
541              failures.incrementAndGet();
542              fail();
543            }
544          }
545        }
546      };
547    }
548
549    // run all threads
550    for (int i = 0; i < numThreads; i++) {
551      all[i].start();
552    }
553
554    // wait for all threads to finish
555    for (int i = 0; i < numThreads; i++) {
556      try {
557        all[i].join();
558      } catch (InterruptedException e) {
559      }
560    }
561    assertEquals(0, failures.get());
562  }
563
564  public static class AtomicOperation extends Thread {
565    protected final HRegion region;
566    protected final int numOps;
567    protected final AtomicLong timeStamps;
568    protected final AtomicInteger failures;
569    protected final Random r = new Random();
570
571    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
572        AtomicInteger failures) {
573      this.region = region;
574      this.numOps = numOps;
575      this.timeStamps = timeStamps;
576      this.failures = failures;
577    }
578  }
579
580  private static CountDownLatch latch = new CountDownLatch(1);
581  private enum TestStep {
582    INIT,                  // initial put of 10 to set value of the cell
583    PUT_STARTED,           // began doing a put of 50 to cell
584    PUT_COMPLETED,         // put complete (released RowLock, but may not have advanced MVCC).
585    CHECKANDPUT_STARTED,   // began checkAndPut: if 10 -> 11
586    CHECKANDPUT_COMPLETED  // completed checkAndPut
587    // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
588  }
589  private static volatile TestStep testStep = TestStep.INIT;
590  private final String family = "f1";
591
592  /**
593   * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
594   * MVCC.
595   *
596   * Moved into TestAtomicOperation from its original location, TestHBase7051
597   */
598  @Test
599  public void testPutAndCheckAndPutInParallel() throws Exception {
600    Configuration conf = TEST_UTIL.getConfiguration();
601    conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
602    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
603        .addFamily(new HColumnDescriptor(family));
604    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
605    Put[] puts = new Put[1];
606    Put put = new Put(Bytes.toBytes("r1"));
607    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
608    puts[0] = put;
609
610    region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
611    MultithreadedTestUtil.TestContext ctx =
612      new MultithreadedTestUtil.TestContext(conf);
613    ctx.addThread(new PutThread(ctx, region));
614    ctx.addThread(new CheckAndPutThread(ctx, region));
615    ctx.startThreads();
616    while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
617      Thread.sleep(100);
618    }
619    ctx.stop();
620    Scan s = new Scan();
621    RegionScanner scanner = region.getScanner(s);
622    List<Cell> results = new ArrayList<>();
623    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
624    scanner.next(results, scannerContext);
625    for (Cell keyValue : results) {
626      assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
627    }
628  }
629
630  private class PutThread extends TestThread {
631    private Region region;
632    PutThread(TestContext ctx, Region region) {
633      super(ctx);
634      this.region = region;
635    }
636
637    @Override
638    public void doWork() throws Exception {
639      Put[] puts = new Put[1];
640      Put put = new Put(Bytes.toBytes("r1"));
641      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
642      puts[0] = put;
643      testStep = TestStep.PUT_STARTED;
644      region.batchMutate(puts);
645    }
646  }
647
648  private class CheckAndPutThread extends TestThread {
649    private Region region;
650    CheckAndPutThread(TestContext ctx, Region region) {
651      super(ctx);
652      this.region = region;
653   }
654
655    @Override
656    public void doWork() throws Exception {
657      Put[] puts = new Put[1];
658      Put put = new Put(Bytes.toBytes("r1"));
659      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
660      puts[0] = put;
661      while (testStep != TestStep.PUT_COMPLETED) {
662        Thread.sleep(100);
663      }
664      testStep = TestStep.CHECKANDPUT_STARTED;
665      region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
666        CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
667      testStep = TestStep.CHECKANDPUT_COMPLETED;
668    }
669  }
670
671  public static class MockHRegion extends HRegion {
672
673    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
674        final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
675      super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
676    }
677
678    @Override
679    public RowLock getRowLockInternal(final byte[] row, boolean readLock,
680        final RowLock prevRowlock) throws IOException {
681      if (testStep == TestStep.CHECKANDPUT_STARTED) {
682        latch.countDown();
683      }
684      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
685    }
686
687    public class WrappedRowLock implements RowLock {
688
689      private final RowLock rowLock;
690
691      private WrappedRowLock(RowLock rowLock) {
692        this.rowLock = rowLock;
693      }
694
695
696      @Override
697      public void release() {
698        if (testStep == TestStep.INIT) {
699          this.rowLock.release();
700          return;
701        }
702
703        if (testStep == TestStep.PUT_STARTED) {
704          try {
705            testStep = TestStep.PUT_COMPLETED;
706            this.rowLock.release();
707            // put has been written to the memstore and the row lock has been released, but the
708            // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
709            // operations would cause the non-atomicity to show up:
710            // 1) Put releases row lock (where we are now)
711            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
712            //    because the MVCC has not advanced
713            // 3) Put advances MVCC
714            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
715            // (see below), and then wait some more to give the checkAndPut time to read the old
716            // value.
717            latch.await();
718            Thread.sleep(1000);
719          } catch (InterruptedException e) {
720            Thread.currentThread().interrupt();
721          }
722        }
723        else if (testStep == TestStep.CHECKANDPUT_STARTED) {
724          this.rowLock.release();
725        }
726      }
727    }
728  }
729}