001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.Objects;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.MultithreadedTestUtil;
044import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Append;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Increment;
054import org.apache.hadoop.hbase.client.IsolationLevel;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.RegionInfoBuilder;
059import org.apache.hadoop.hbase.client.Result;
060import org.apache.hadoop.hbase.client.RowMutations;
061import org.apache.hadoop.hbase.client.Scan;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
064import org.apache.hadoop.hbase.filter.BinaryComparator;
065import org.apache.hadoop.hbase.io.HeapSize;
066import org.apache.hadoop.hbase.io.hfile.BlockCache;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.junit.jupiter.api.AfterEach;
073import org.junit.jupiter.api.BeforeEach;
074import org.junit.jupiter.api.Tag;
075import org.junit.jupiter.api.Test;
076import org.junit.jupiter.api.TestInfo;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append
082 */
083@Tag(VerySlowRegionServerTests.TAG)
084@Tag(LargeTests.TAG) // Starts 100 threads
085public class TestAtomicOperation {
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
088  private String name;
089
090  HRegion region = null;
091  private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
092
093  // Test names
094  static byte[] tableName;
095  static final byte[] qual1 = Bytes.toBytes("qual1");
096  static final byte[] qual2 = Bytes.toBytes("qual2");
097  static final byte[] qual3 = Bytes.toBytes("qual3");
098  static final byte[] value1 = Bytes.toBytes("value1");
099  static final byte[] value2 = Bytes.toBytes("value2");
100  static final byte[] row = Bytes.toBytes("rowA");
101  static final byte[] row2 = Bytes.toBytes("rowB");
102
103  @BeforeEach
104  public void setup(TestInfo testInfo) {
105    this.name = testInfo.getTestMethod().get().getName();
106    tableName = Bytes.toBytes(name);
107  }
108
109  @AfterEach
110  public void teardown() throws IOException {
111    if (region != null) {
112      CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
113      region.close();
114      WAL wal = region.getWAL();
115      if (wal != null) {
116        wal.close();
117      }
118      cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
119      region = null;
120    }
121  }
122
123  //////////////////////////////////////////////////////////////////////////////
124  // New tests that doesn't spin up a mini cluster but rather just test the
125  // individual code pieces in the HRegion.
126  //////////////////////////////////////////////////////////////////////////////
127
128  /**
129   * Test basic append operation. More tests in
130   * {@link org.apache.hadoop.hbase.client.FromClientSideTest5#testAppend()}.
131   */
132  @Test
133  public void testAppend() throws IOException {
134    initHRegion(tableName, name, fam1);
135    String v1 =
136      "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything";
137    String v2 = " is... 42.";
138    Append a = new Append(row);
139    a.setReturnResults(false);
140    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
141    a.addColumn(fam1, qual2, Bytes.toBytes(v2));
142    assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
143    a = new Append(row);
144    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
145    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
146    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
147    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1)));
148    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2)));
149  }
150
151  @Test
152  public void testAppendWithMultipleFamilies() throws IOException {
153    final byte[] fam3 = Bytes.toBytes("colfamily31");
154    initHRegion(tableName, name, fam1, fam2, fam3);
155    String v1 = "Appended";
156    String v2 = "Value";
157
158    Append a = new Append(row);
159    a.setReturnResults(false);
160    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
161    a.addColumn(fam2, qual2, Bytes.toBytes(v2));
162    Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
163    assertTrue(result.isEmpty(),
164      "Expected an empty result but result contains " + result.size() + " keys");
165
166    a = new Append(row);
167    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
168    a.addColumn(fam1, qual1, Bytes.toBytes(v2));
169    a.addColumn(fam3, qual3, Bytes.toBytes(v2));
170    a.addColumn(fam1, qual2, Bytes.toBytes(v1));
171
172    result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
173
174    byte[] actualValue1 = result.getValue(fam1, qual1);
175    byte[] actualValue2 = result.getValue(fam2, qual2);
176    byte[] actualValue3 = result.getValue(fam3, qual3);
177    byte[] actualValue4 = result.getValue(fam1, qual2);
178
179    assertNotNull(actualValue1, "Value1 should bot be null");
180    assertNotNull(actualValue2, "Value2 should bot be null");
181    assertNotNull(actualValue3, "Value3 should bot be null");
182    assertNotNull(actualValue4, "Value4 should bot be null");
183    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
184    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
185    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
186    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
187  }
188
189  @Test
190  public void testAppendWithNonExistingFamily() throws IOException {
191    initHRegion(tableName, name, fam1);
192    final String v1 = "Value";
193    final Append a = new Append(row);
194    a.addColumn(fam1, qual1, Bytes.toBytes(v1));
195    a.addColumn(fam2, qual2, Bytes.toBytes(v1));
196    Result result = null;
197    try {
198      result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
199      fail("Append operation should fail with NoSuchColumnFamilyException.");
200    } catch (NoSuchColumnFamilyException e) {
201      assertEquals(null, result);
202    } catch (Exception e) {
203      fail("Append operation should fail with NoSuchColumnFamilyException.");
204    }
205  }
206
207  @Test
208  public void testIncrementWithNonExistingFamily() throws IOException {
209    initHRegion(tableName, name, fam1);
210    final Increment inc = new Increment(row);
211    inc.addColumn(fam1, qual1, 1);
212    inc.addColumn(fam2, qual2, 1);
213    inc.setDurability(Durability.ASYNC_WAL);
214    try {
215      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
216    } catch (NoSuchColumnFamilyException e) {
217      final Get g = new Get(row);
218      final Result result = region.get(g);
219      assertEquals(null, result.getValue(fam1, qual1));
220      assertEquals(null, result.getValue(fam2, qual2));
221    } catch (Exception e) {
222      fail("Increment operation should fail with NoSuchColumnFamilyException.");
223    }
224  }
225
226  /**
227   * Test multi-threaded increments.
228   */
229  @Test
230  public void testIncrementMultiThreads() throws IOException {
231    boolean fast = true;
232    LOG.info("Starting test testIncrementMultiThreads");
233    // run a with mixed column families (1 and 3 versions)
234    initHRegion(tableName, name, new int[] { 1, 3 }, fam1, fam2);
235
236    // Create 100 threads, each will increment by its own quantity. All 100 threads update the
237    // same row over two column families.
238    int numThreads = 100;
239    int incrementsPerThread = 1000;
240    Incrementer[] all = new Incrementer[numThreads];
241    int expectedTotal = 0;
242    // create all threads
243    for (int i = 0; i < numThreads; i++) {
244      all[i] = new Incrementer(region, i, i, incrementsPerThread);
245      expectedTotal += (i * incrementsPerThread);
246    }
247
248    // run all threads
249    for (int i = 0; i < numThreads; i++) {
250      all[i].start();
251    }
252
253    // wait for all threads to finish
254    for (int i = 0; i < numThreads; i++) {
255      try {
256        all[i].join();
257      } catch (InterruptedException e) {
258        LOG.info("Ignored", e);
259      }
260    }
261    assertICV(row, fam1, qual1, expectedTotal, fast);
262    assertICV(row, fam1, qual2, expectedTotal * 2, fast);
263    assertICV(row, fam2, qual3, expectedTotal * 3, fast);
264    LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
265  }
266
267  private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast)
268    throws IOException {
269    // run a get and see?
270    Get get = new Get(row);
271    if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
272    get.addColumn(familiy, qualifier);
273    Result result = region.get(get);
274    assertEquals(1, result.size());
275
276    Cell kv = result.rawCells()[0];
277    long r = Bytes.toLong(CellUtil.cloneValue(kv));
278    assertEquals(amount, r);
279  }
280
281  private void initHRegion(byte[] tableName, String callingMethod, byte[]... families)
282    throws IOException {
283    initHRegion(tableName, callingMethod, null, families);
284  }
285
286  private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions,
287    byte[]... families) throws IOException {
288    TableDescriptorBuilder builder =
289      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
290
291    int i = 0;
292    for (byte[] family : families) {
293      ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family)
294        .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build();
295      builder.setColumnFamily(familyDescriptor);
296    }
297    TableDescriptor tableDescriptor = builder.build();
298    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
299    region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
300  }
301
302  /**
303   * A thread that makes increment calls always on the same row, this.row against two column
304   * families on this row.
305   */
306  public static class Incrementer extends Thread {
307
308    private final Region region;
309    private final int numIncrements;
310    private final int amount;
311
312    public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
313      super("Incrementer." + threadNumber);
314      this.region = region;
315      this.numIncrements = numIncrements;
316      this.amount = amount;
317      setDaemon(true);
318    }
319
320    @Override
321    public void run() {
322      for (int i = 0; i < numIncrements; i++) {
323        try {
324          Increment inc = new Increment(row);
325          inc.addColumn(fam1, qual1, amount);
326          inc.addColumn(fam1, qual2, amount * 2);
327          inc.addColumn(fam2, qual3, amount * 3);
328          inc.setDurability(Durability.ASYNC_WAL);
329          Result result = region.increment(inc);
330          if (result != null) {
331            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
332              Bytes.toLong(result.getValue(fam1, qual2)));
333            assertTrue(result.getValue(fam2, qual3) != null);
334            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3,
335              Bytes.toLong(result.getValue(fam2, qual3)));
336            assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2,
337              Bytes.toLong(result.getValue(fam1, qual2)));
338            long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3;
339            long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
340            assertEquals(fam1Increment, fam2Increment,
341              "fam1=" + fam1Increment + ", fam2=" + fam2Increment);
342          }
343        } catch (IOException e) {
344          e.printStackTrace();
345        }
346      }
347    }
348  }
349
350  @Test
351  public void testAppendMultiThreads() throws IOException {
352    LOG.info("Starting test testAppendMultiThreads");
353    // run a with mixed column families (1 and 3 versions)
354    initHRegion(tableName, name, new int[] { 1, 3 }, fam1, fam2);
355
356    int numThreads = 100;
357    int opsPerThread = 100;
358    AtomicOperation[] all = new AtomicOperation[numThreads];
359    final byte[] val = new byte[] { 1 };
360
361    AtomicInteger failures = new AtomicInteger(0);
362    // create all threads
363    for (int i = 0; i < numThreads; i++) {
364      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
365        @Override
366        public void run() {
367          for (int i = 0; i < numOps; i++) {
368            try {
369              Append a = new Append(row);
370              a.addColumn(fam1, qual1, val);
371              a.addColumn(fam1, qual2, val);
372              a.addColumn(fam2, qual3, val);
373              a.setDurability(Durability.ASYNC_WAL);
374              region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
375
376              Get g = new Get(row);
377              Result result = region.get(g);
378              assertEquals(result.getValue(fam1, qual1).length,
379                result.getValue(fam1, qual2).length);
380              assertEquals(result.getValue(fam1, qual1).length,
381                result.getValue(fam2, qual3).length);
382            } catch (IOException e) {
383              e.printStackTrace();
384              failures.incrementAndGet();
385              fail();
386            }
387          }
388        }
389      };
390    }
391
392    // run all threads
393    for (int i = 0; i < numThreads; i++) {
394      all[i].start();
395    }
396
397    // wait for all threads to finish
398    for (int i = 0; i < numThreads; i++) {
399      try {
400        all[i].join();
401      } catch (InterruptedException e) {
402      }
403    }
404    assertEquals(0, failures.get());
405    Get g = new Get(row);
406    Result result = region.get(g);
407    assertEquals(10000, result.getValue(fam1, qual1).length);
408    assertEquals(10000, result.getValue(fam1, qual2).length);
409    assertEquals(10000, result.getValue(fam2, qual3).length);
410  }
411
412  /**
413   * Test multi-threaded row mutations.
414   */
415  @Test
416  public void testRowMutationMultiThreads() throws IOException {
417    LOG.info("Starting test testRowMutationMultiThreads");
418    initHRegion(tableName, name, fam1);
419
420    // create 10 threads, each will alternate between adding and
421    // removing a column
422    int numThreads = 10;
423    int opsPerThread = 250;
424    AtomicOperation[] all = new AtomicOperation[numThreads];
425
426    AtomicLong timeStamps = new AtomicLong(0);
427    AtomicInteger failures = new AtomicInteger(0);
428    // create all threads
429    for (int i = 0; i < numThreads; i++) {
430      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
431        @Override
432        public void run() {
433          boolean op = true;
434          for (int i = 0; i < numOps; i++) {
435            try {
436              // throw in some flushes
437              if (i % 10 == 0) {
438                synchronized (region) {
439                  LOG.debug("flushing");
440                  region.flush(true);
441                  if (i % 100 == 0) {
442                    region.compact(false);
443                  }
444                }
445              }
446              long ts = timeStamps.incrementAndGet();
447              RowMutations rm = new RowMutations(row);
448              if (op) {
449                Put p = new Put(row, ts);
450                p.addColumn(fam1, qual1, value1);
451                p.setDurability(Durability.ASYNC_WAL);
452                rm.add(p);
453                Delete d = new Delete(row);
454                d.addColumns(fam1, qual2, ts);
455                d.setDurability(Durability.ASYNC_WAL);
456                rm.add(d);
457              } else {
458                Delete d = new Delete(row);
459                d.addColumns(fam1, qual1, ts);
460                d.setDurability(Durability.ASYNC_WAL);
461                rm.add(d);
462                Put p = new Put(row, ts);
463                p.addColumn(fam1, qual2, value2);
464                p.setDurability(Durability.ASYNC_WAL);
465                rm.add(p);
466              }
467              region.mutateRow(rm);
468              op ^= true;
469              // check: should always see exactly one column
470              Get g = new Get(row);
471              Result r = region.get(g);
472              if (r.size() != 1) {
473                LOG.debug(Objects.toString(r));
474                failures.incrementAndGet();
475                fail();
476              }
477            } catch (IOException e) {
478              e.printStackTrace();
479              failures.incrementAndGet();
480              fail();
481            }
482          }
483        }
484      };
485    }
486
487    // run all threads
488    for (int i = 0; i < numThreads; i++) {
489      all[i].start();
490    }
491
492    // wait for all threads to finish
493    for (int i = 0; i < numThreads; i++) {
494      try {
495        all[i].join();
496      } catch (InterruptedException e) {
497      }
498    }
499    assertEquals(0, failures.get());
500  }
501
502  /**
503   * Test multi-threaded region mutations.
504   */
505  @Test
506  public void testMultiRowMutationMultiThreads() throws IOException {
507
508    LOG.info("Starting test testMultiRowMutationMultiThreads");
509    initHRegion(tableName, name, fam1);
510
511    // create 10 threads, each will alternate between adding and
512    // removing a column
513    int numThreads = 10;
514    int opsPerThread = 250;
515    AtomicOperation[] all = new AtomicOperation[numThreads];
516
517    AtomicLong timeStamps = new AtomicLong(0);
518    AtomicInteger failures = new AtomicInteger(0);
519    final List<byte[]> rowsToLock = Arrays.asList(row, row2);
520    // create all threads
521    for (int i = 0; i < numThreads; i++) {
522      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
523        @Override
524        public void run() {
525          boolean op = true;
526          for (int i = 0; i < numOps; i++) {
527            try {
528              // throw in some flushes
529              if (i % 10 == 0) {
530                synchronized (region) {
531                  LOG.debug("flushing");
532                  region.flush(true);
533                  if (i % 100 == 0) {
534                    region.compact(false);
535                  }
536                }
537              }
538              long ts = timeStamps.incrementAndGet();
539              List<Mutation> mrm = new ArrayList<>();
540              if (op) {
541                Put p = new Put(row2, ts);
542                p.addColumn(fam1, qual1, value1);
543                p.setDurability(Durability.ASYNC_WAL);
544                mrm.add(p);
545                Delete d = new Delete(row);
546                d.addColumns(fam1, qual1, ts);
547                d.setDurability(Durability.ASYNC_WAL);
548                mrm.add(d);
549              } else {
550                Delete d = new Delete(row2);
551                d.addColumns(fam1, qual1, ts);
552                d.setDurability(Durability.ASYNC_WAL);
553                mrm.add(d);
554                Put p = new Put(row, ts);
555                p.setDurability(Durability.ASYNC_WAL);
556                p.addColumn(fam1, qual1, value2);
557                mrm.add(p);
558              }
559              region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
560              op ^= true;
561              // check: should always see exactly one column
562              Scan s = new Scan().withStartRow(row);
563              RegionScanner rs = region.getScanner(s);
564              List<Cell> r = new ArrayList<>();
565              while (rs.next(r))
566                ;
567              rs.close();
568              if (r.size() != 1) {
569                LOG.debug(Objects.toString(r));
570                failures.incrementAndGet();
571                fail();
572              }
573            } catch (IOException e) {
574              e.printStackTrace();
575              failures.incrementAndGet();
576              fail();
577            }
578          }
579        }
580      };
581    }
582
583    // run all threads
584    for (int i = 0; i < numThreads; i++) {
585      all[i].start();
586    }
587
588    // wait for all threads to finish
589    for (int i = 0; i < numThreads; i++) {
590      try {
591        all[i].join();
592      } catch (InterruptedException e) {
593      }
594    }
595    assertEquals(0, failures.get());
596  }
597
598  public static class AtomicOperation extends Thread {
599    protected final HRegion region;
600    protected final int numOps;
601    protected final AtomicLong timeStamps;
602    protected final AtomicInteger failures;
603
604    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
605      AtomicInteger failures) {
606      this.region = region;
607      this.numOps = numOps;
608      this.timeStamps = timeStamps;
609      this.failures = failures;
610    }
611  }
612
613  private static CountDownLatch latch = new CountDownLatch(1);
614
615  private enum TestStep {
616    INIT, // initial put of 10 to set value of the cell
617    PUT_STARTED, // began doing a put of 50 to cell
618    PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
619    CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
620    CHECKANDPUT_COMPLETED // completed checkAndPut
621    // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
622  }
623
624  private static volatile TestStep testStep = TestStep.INIT;
625  private final String family = "f1";
626
627  /**
628   * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into
629   * TestAtomicOperation from its original location, TestHBase7051
630   */
631  @Test
632  public void testPutAndCheckAndPutInParallel() throws Exception {
633    Configuration conf = TEST_UTIL.getConfiguration();
634    conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
635    TableDescriptorBuilder tableDescriptorBuilder =
636      TableDescriptorBuilder.newBuilder(TableName.valueOf(name));
637    ColumnFamilyDescriptor columnFamilyDescriptor =
638      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();
639    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
640    this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null);
641    Put[] puts = new Put[1];
642    Put put = new Put(Bytes.toBytes("r1"));
643    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
644    puts[0] = put;
645
646    region.batchMutate(puts);
647    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
648    ctx.addThread(new PutThread(ctx, region));
649    ctx.addThread(new CheckAndPutThread(ctx, region));
650    ctx.startThreads();
651    while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
652      Thread.sleep(100);
653    }
654    ctx.stop();
655    Scan s = new Scan();
656    RegionScanner scanner = region.getScanner(s);
657    List<Cell> results = new ArrayList<>();
658    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
659    scanner.next(results, scannerContext);
660    for (Cell keyValue : results) {
661      assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue)));
662    }
663  }
664
665  private class PutThread extends TestThread {
666    private Region region;
667
668    PutThread(TestContext ctx, Region region) {
669      super(ctx);
670      this.region = region;
671    }
672
673    @Override
674    public void doWork() throws Exception {
675      Put[] puts = new Put[1];
676      Put put = new Put(Bytes.toBytes("r1"));
677      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
678      puts[0] = put;
679      testStep = TestStep.PUT_STARTED;
680      region.batchMutate(puts);
681    }
682  }
683
684  private class CheckAndPutThread extends TestThread {
685    private Region region;
686
687    CheckAndPutThread(TestContext ctx, Region region) {
688      super(ctx);
689      this.region = region;
690    }
691
692    @Override
693    public void doWork() throws Exception {
694      Put[] puts = new Put[1];
695      Put put = new Put(Bytes.toBytes("r1"));
696      put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
697      puts[0] = put;
698      while (testStep != TestStep.PUT_COMPLETED) {
699        Thread.sleep(100);
700      }
701      testStep = TestStep.CHECKANDPUT_STARTED;
702      region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
703        CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
704      testStep = TestStep.CHECKANDPUT_COMPLETED;
705    }
706  }
707
708  public static class MockHRegion extends HRegion {
709
710    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
711      final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
712      super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
713    }
714
715    @Override
716    protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
717      final RowLock prevRowlock) throws IOException {
718      if (testStep == TestStep.CHECKANDPUT_STARTED) {
719        latch.countDown();
720      }
721      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
722    }
723
724    public class WrappedRowLock implements RowLock {
725
726      private final RowLock rowLock;
727
728      private WrappedRowLock(RowLock rowLock) {
729        this.rowLock = rowLock;
730      }
731
732      @Override
733      public void release() {
734        if (testStep == TestStep.INIT) {
735          this.rowLock.release();
736          return;
737        }
738
739        if (testStep == TestStep.PUT_STARTED) {
740          try {
741            testStep = TestStep.PUT_COMPLETED;
742            this.rowLock.release();
743            // put has been written to the memstore and the row lock has been released, but the
744            // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
745            // operations would cause the non-atomicity to show up:
746            // 1) Put releases row lock (where we are now)
747            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
748            // because the MVCC has not advanced
749            // 3) Put advances MVCC
750            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
751            // (see below), and then wait some more to give the checkAndPut time to read the old
752            // value.
753            latch.await();
754            Thread.sleep(1000);
755          } catch (InterruptedException e) {
756            Thread.currentThread().interrupt();
757          }
758        } else if (testStep == TestStep.CHECKANDPUT_STARTED) {
759          this.rowLock.release();
760        }
761      }
762    }
763  }
764}