001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Increment;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
036import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.jupiter.api.AfterEach;
041import org.junit.jupiter.api.BeforeEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.junit.jupiter.api.TestInfo;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Increments with some concurrency against a region to ensure we get the right answer. Test is
050 * parameterized to run the fast and slow path increments; if fast,
051 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
052 * <p>
053 * There is similar test up in TestAtomicOperation. It does a test where it has 100 threads doing
054 * increments across two column families all on one row and the increments are connected to prove
055 * atomicity on row.
056 */
057@Tag(MediumTests.TAG)
058public class TestRegionIncrement {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestRegionIncrement.class);
061
062  private String name;
063  private static HBaseTestingUtil TEST_UTIL;
064  private final static byte[] INCREMENT_BYTES = Bytes.toBytes("increment");
065  private static final int THREAD_COUNT = 10;
066  private static final int INCREMENT_COUNT = 10000;
067
068  @BeforeEach
069  public void setUp(TestInfo testInfo) throws Exception {
070    TEST_UTIL = new HBaseTestingUtil();
071    name = testInfo.getTestMethod().get().getName();
072  }
073
074  @AfterEach
075  public void tearDown() throws Exception {
076    TEST_UTIL.cleanupTestDir();
077  }
078
079  private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
080    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
081      TEST_UTIL.getDataTestDir().toString(), conf);
082    wal.init();
083    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
084      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
085    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY,
086      HConstants.EMPTY_BYTE_ARRAY, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
087  }
088
089  private void closeRegion(final HRegion region) throws IOException {
090    region.close();
091    region.getWAL().close();
092  }
093
094  @Test
095  public void testMVCCCausingMisRead() throws IOException {
096    final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name);
097    try {
098      // ADD TEST HERE!!
099    } finally {
100      closeRegion(region);
101    }
102  }
103
104  /**
105   * Increments a single cell a bunch of times.
106   */
107  private static class SingleCellIncrementer extends Thread {
108    private final int count;
109    private final HRegion region;
110    private final Increment increment;
111
112    SingleCellIncrementer(final int i, final int count, final HRegion region,
113      final Increment increment) {
114      super("" + i);
115      setDaemon(true);
116      this.count = count;
117      this.region = region;
118      this.increment = increment;
119    }
120
121    @Override
122    public void run() {
123      for (int i = 0; i < this.count; i++) {
124        try {
125          this.region.increment(this.increment);
126          // LOG.info(getName() + " " + i);
127        } catch (IOException e) {
128          throw new RuntimeException(e);
129        }
130      }
131    }
132  }
133
134  /**
135   * Increments a random row's Cell <code>count</code> times.
136   */
137  private static class CrossRowCellIncrementer extends Thread {
138    private final int count;
139    private final HRegion region;
140    private final Increment[] increments;
141
142    CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
143      super("" + i);
144      setDaemon(true);
145      this.count = count;
146      this.region = region;
147      this.increments = new Increment[range];
148      for (int ii = 0; ii < range; ii++) {
149        this.increments[ii] = new Increment(Bytes.toBytes(i));
150        this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
151      }
152    }
153
154    @Override
155    public void run() {
156      for (int i = 0; i < this.count; i++) {
157        try {
158          int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
159          this.region.increment(this.increments[index]);
160          // LOG.info(getName() + " " + index);
161        } catch (IOException e) {
162          throw new RuntimeException(e);
163        }
164      }
165    }
166  }
167
168  /**
169   * Have each thread update its own Cell. Avoid contention with another thread.
170   */
171  @Test
172  public void testUnContendedSingleCellIncrement() throws IOException, InterruptedException {
173    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
174      TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name));
175    long startTime = EnvironmentEdgeManager.currentTime();
176    try {
177      SingleCellIncrementer[] threads = new SingleCellIncrementer[THREAD_COUNT];
178      for (int i = 0; i < threads.length; i++) {
179        byte[] rowBytes = Bytes.toBytes(i);
180        Increment increment = new Increment(rowBytes);
181        increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
182        threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
183      }
184      for (int i = 0; i < threads.length; i++) {
185        threads[i].start();
186      }
187      for (int i = 0; i < threads.length; i++) {
188        threads[i].join();
189      }
190      RegionScanner regionScanner = region.getScanner(new Scan());
191      List<Cell> cells = new ArrayList<>(THREAD_COUNT);
192      while (regionScanner.next(cells))
193        continue;
194      assertEquals(THREAD_COUNT, cells.size());
195      long total = 0;
196      for (Cell cell : cells)
197        total += Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
198      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
199    } finally {
200      closeRegion(region);
201      LOG.info(this.name + " " + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
202    }
203  }
204
205  /**
206   * Have each thread update its own Cell. Avoid contention with another thread.
207   */
208  @Test
209  public void testContendedAcrossCellsIncrement() throws IOException, InterruptedException {
210    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
211      TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name));
212    long startTime = EnvironmentEdgeManager.currentTime();
213    try {
214      CrossRowCellIncrementer[] threads = new CrossRowCellIncrementer[THREAD_COUNT];
215      for (int i = 0; i < threads.length; i++) {
216        threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
217      }
218      for (int i = 0; i < threads.length; i++) {
219        threads[i].start();
220      }
221      for (int i = 0; i < threads.length; i++) {
222        threads[i].join();
223      }
224      RegionScanner regionScanner = region.getScanner(new Scan());
225      List<Cell> cells = new ArrayList<>(100);
226      while (regionScanner.next(cells))
227        continue;
228      assertEquals(THREAD_COUNT, cells.size());
229      long total = 0;
230      for (Cell cell : cells)
231        total += Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
232      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
233    } finally {
234      closeRegion(region);
235      LOG.info(this.name + " " + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
236    }
237  }
238}