001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
037import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.After;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Rule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.junit.rules.TestName;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Increments with some concurrency against a region to ensure we get the right answer.
053 * Test is parameterized to run the fast and slow path increments; if fast,
054 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
055 *
056 * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
057 * doing increments across two column families all on one row and the increments are connected to
058 * prove atomicity on row.
059 */
060@Category(MediumTests.class)
061public class TestRegionIncrement {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestRegionIncrement.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestRegionIncrement.class);
068  @Rule public TestName name = new TestName();
069  private static HBaseTestingUtil TEST_UTIL;
070  private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
071  private static final int THREAD_COUNT = 10;
072  private static final int INCREMENT_COUNT = 10000;
073
074  @Before
075  public void setUp() throws Exception {
076    TEST_UTIL = new HBaseTestingUtil();
077  }
078
079  @After
080  public void tearDown() throws Exception {
081    TEST_UTIL.cleanupTestDir();
082  }
083
084  private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
085    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
086        TEST_UTIL.getDataTestDir().toString(), conf);
087    wal.init();
088    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
089      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
090    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY,
091      HConstants.EMPTY_BYTE_ARRAY, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
092  }
093
094  private void closeRegion(final HRegion region) throws IOException {
095    region.close();
096    region.getWAL().close();
097  }
098
099  @Test
100  public void testMVCCCausingMisRead() throws IOException {
101    final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName());
102    try {
103      // ADD TEST HERE!!
104    } finally {
105      closeRegion(region);
106    }
107  }
108
109  /**
110   * Increments a single cell a bunch of times.
111   */
112  private static class SingleCellIncrementer extends Thread {
113    private final int count;
114    private final HRegion region;
115    private final Increment increment;
116
117    SingleCellIncrementer(final int i, final int count, final HRegion region,
118        final Increment increment) {
119      super("" + i);
120      setDaemon(true);
121      this.count = count;
122      this.region = region;
123      this.increment = increment;
124    }
125
126    @Override
127    public void run() {
128      for (int i = 0; i < this.count; i++) {
129        try {
130          this.region.increment(this.increment);
131          // LOG.info(getName() + " " + i);
132        } catch (IOException e) {
133          throw new RuntimeException(e);
134        }
135      }
136    }
137  }
138
139  /**
140   * Increments a random row's Cell <code>count</code> times.
141   */
142  private static class CrossRowCellIncrementer extends Thread {
143    private final int count;
144    private final HRegion region;
145    private final Increment [] increments;
146
147    CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
148      super("" + i);
149      setDaemon(true);
150      this.count = count;
151      this.region = region;
152      this.increments = new Increment[range];
153      for (int ii = 0; ii < range; ii++) {
154        this.increments[ii] = new Increment(Bytes.toBytes(i));
155        this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
156      }
157    }
158
159    @Override
160    public void run() {
161      for (int i = 0; i < this.count; i++) {
162        try {
163          int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
164          this.region.increment(this.increments[index]);
165          // LOG.info(getName() + " " + index);
166        } catch (IOException e) {
167          throw new RuntimeException(e);
168        }
169      }
170    }
171  }
172
173  /**
174   * Have each thread update its own Cell. Avoid contention with another thread.
175   */
176  @Test
177  public void testUnContendedSingleCellIncrement()
178  throws IOException, InterruptedException {
179    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
180        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
181    long startTime = EnvironmentEdgeManager.currentTime();
182    try {
183      SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
184      for (int i = 0; i < threads.length; i++) {
185        byte [] rowBytes = Bytes.toBytes(i);
186        Increment increment = new Increment(rowBytes);
187        increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
188        threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
189      }
190      for (int i = 0; i < threads.length; i++) {
191        threads[i].start();
192      }
193      for (int i = 0; i < threads.length; i++) {
194        threads[i].join();
195      }
196      RegionScanner regionScanner = region.getScanner(new Scan());
197      List<Cell> cells = new ArrayList<>(THREAD_COUNT);
198      while(regionScanner.next(cells)) continue;
199      assertEquals(THREAD_COUNT, cells.size());
200      long total = 0;
201      for (Cell cell: cells) total +=
202        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
203      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
204    } finally {
205      closeRegion(region);
206      LOG.info(this.name.getMethodName() + " " +
207        (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
208    }
209  }
210
211  /**
212   * Have each thread update its own Cell. Avoid contention with another thread.
213   */
214  @Test
215  public void testContendedAcrossCellsIncrement() throws IOException, InterruptedException {
216    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
217        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
218    long startTime = EnvironmentEdgeManager.currentTime();
219    try {
220      CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
221      for (int i = 0; i < threads.length; i++) {
222        threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
223      }
224      for (int i = 0; i < threads.length; i++) {
225        threads[i].start();
226      }
227      for (int i = 0; i < threads.length; i++) {
228        threads[i].join();
229      }
230      RegionScanner regionScanner = region.getScanner(new Scan());
231      List<Cell> cells = new ArrayList<>(100);
232      while(regionScanner.next(cells)) continue;
233      assertEquals(THREAD_COUNT, cells.size());
234      long total = 0;
235      for (Cell cell: cells) total +=
236        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
237      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
238    } finally {
239      closeRegion(region);
240      LOG.info(this.name.getMethodName() + " " +
241        (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
242    }
243  }
244}