001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Increment;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
036import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.junit.After;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Rule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.rules.TestName;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Increments with some concurrency against a region to ensure we get the right answer.
051 * Test is parameterized to run the fast and slow path increments; if fast,
052 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
053 *
054 * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
055 * doing increments across two column families all on one row and the increments are connected to
056 * prove atomicity on row.
057 */
058@Category(MediumTests.class)
059public class TestRegionIncrement {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestRegionIncrement.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestRegionIncrement.class);
066  @Rule public TestName name = new TestName();
067  private static HBaseTestingUtility TEST_UTIL;
068  private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
069  private static final int THREAD_COUNT = 10;
070  private static final int INCREMENT_COUNT = 10000;
071
072  @Before
073  public void setUp() throws Exception {
074    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
075  }
076
077  @After
078  public void tearDown() throws Exception {
079    TEST_UTIL.cleanupTestDir();
080  }
081
082  private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
083    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
084      TEST_UTIL.getDataTestDir().toString(), conf);
085    wal.init();
086    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
087      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
088    return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
089      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
090      false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
091  }
092
093  private void closeRegion(final HRegion region) throws IOException {
094    region.close();
095    region.getWAL().close();
096  }
097
098  @Test
099  public void testMVCCCausingMisRead() throws IOException {
100    final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName());
101    try {
102      // ADD TEST HERE!!
103    } finally {
104      closeRegion(region);
105    }
106  }
107
108  /**
109   * Increments a single cell a bunch of times.
110   */
111  private static class SingleCellIncrementer extends Thread {
112    private final int count;
113    private final HRegion region;
114    private final Increment increment;
115
116    SingleCellIncrementer(final int i, final int count, final HRegion region,
117        final Increment increment) {
118      super("" + i);
119      setDaemon(true);
120      this.count = count;
121      this.region = region;
122      this.increment = increment;
123    }
124
125    @Override
126    public void run() {
127      for (int i = 0; i < this.count; i++) {
128        try {
129          this.region.increment(this.increment);
130          // LOG.info(getName() + " " + i);
131        } catch (IOException e) {
132          throw new RuntimeException(e);
133        }
134      }
135    }
136  }
137
138  /**
139   * Increments a random row's Cell <code>count</code> times.
140   */
141  private static class CrossRowCellIncrementer extends Thread {
142    private final int count;
143    private final HRegion region;
144    private final Increment [] increments;
145
146    CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
147      super("" + i);
148      setDaemon(true);
149      this.count = count;
150      this.region = region;
151      this.increments = new Increment[range];
152      for (int ii = 0; ii < range; ii++) {
153        this.increments[ii] = new Increment(Bytes.toBytes(i));
154        this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
155      }
156    }
157
158    @Override
159    public void run() {
160      for (int i = 0; i < this.count; i++) {
161        try {
162          int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
163          this.region.increment(this.increments[index]);
164          // LOG.info(getName() + " " + index);
165        } catch (IOException e) {
166          throw new RuntimeException(e);
167        }
168      }
169    }
170  }
171
172  /**
173   * Have each thread update its own Cell. Avoid contention with another thread.
174   * @throws IOException
175   * @throws InterruptedException
176   */
177  @Test
178  public void testUnContendedSingleCellIncrement()
179  throws IOException, InterruptedException {
180    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
181        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
182    long startTime = System.currentTimeMillis();
183    try {
184      SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
185      for (int i = 0; i < threads.length; i++) {
186        byte [] rowBytes = Bytes.toBytes(i);
187        Increment increment = new Increment(rowBytes);
188        increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
189        threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
190      }
191      for (int i = 0; i < threads.length; i++) {
192        threads[i].start();
193      }
194      for (int i = 0; i < threads.length; i++) {
195        threads[i].join();
196      }
197      RegionScanner regionScanner = region.getScanner(new Scan());
198      List<Cell> cells = new ArrayList<>(THREAD_COUNT);
199      while(regionScanner.next(cells)) continue;
200      assertEquals(THREAD_COUNT, cells.size());
201      long total = 0;
202      for (Cell cell: cells) total +=
203        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
204      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
205    } finally {
206      closeRegion(region);
207      LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
208    }
209  }
210
211  /**
212   * Have each thread update its own Cell. Avoid contention with another thread.
213   * This is
214   * @throws IOException
215   * @throws InterruptedException
216   */
217  @Test
218  public void testContendedAcrossCellsIncrement()
219  throws IOException, InterruptedException {
220    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
221        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
222    long startTime = System.currentTimeMillis();
223    try {
224      CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
225      for (int i = 0; i < threads.length; i++) {
226        threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
227      }
228      for (int i = 0; i < threads.length; i++) {
229        threads[i].start();
230      }
231      for (int i = 0; i < threads.length; i++) {
232        threads[i].join();
233      }
234      RegionScanner regionScanner = region.getScanner(new Scan());
235      List<Cell> cells = new ArrayList<>(100);
236      while(regionScanner.next(cells)) continue;
237      assertEquals(THREAD_COUNT, cells.size());
238      long total = 0;
239      for (Cell cell: cells) total +=
240        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
241      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
242    } finally {
243      closeRegion(region);
244      LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
245    }
246  }
247}