001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Increment;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
036import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.wal.WAL;
040import org.junit.After;
041import org.junit.Before;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Increments with some concurrency against a region to ensure we get the right answer.
052 * Test is parameterized to run the fast and slow path increments; if fast,
053 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
054 *
055 * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
056 * doing increments across two column families all on one row and the increments are connected to
057 * prove atomicity on row.
058 */
059@Category(MediumTests.class)
060public class TestRegionIncrement {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestRegionIncrement.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestRegionIncrement.class);
067  @Rule public TestName name = new TestName();
068  private static HBaseTestingUtility TEST_UTIL;
069  private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
070  private static final int THREAD_COUNT = 10;
071  private static final int INCREMENT_COUNT = 10000;
072
073  @Before
074  public void setUp() throws Exception {
075    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
076  }
077
078  @After
079  public void tearDown() throws Exception {
080    TEST_UTIL.cleanupTestDir();
081  }
082
083  private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
084    WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
085      TEST_UTIL.getDataTestDir().toString(), conf);
086    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
087    return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
088      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
089      false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
090  }
091
092  private void closeRegion(final HRegion region) throws IOException {
093    region.close();
094    region.getWAL().close();
095  }
096
097  @Test
098  public void testMVCCCausingMisRead() throws IOException {
099    final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName());
100    try {
101      // ADD TEST HERE!!
102    } finally {
103      closeRegion(region);
104    }
105  }
106
107  /**
108   * Increments a single cell a bunch of times.
109   */
110  private static class SingleCellIncrementer extends Thread {
111    private final int count;
112    private final HRegion region;
113    private final Increment increment;
114
115    SingleCellIncrementer(final int i, final int count, final HRegion region,
116        final Increment increment) {
117      super("" + i);
118      setDaemon(true);
119      this.count = count;
120      this.region = region;
121      this.increment = increment;
122    }
123
124    @Override
125    public void run() {
126      for (int i = 0; i < this.count; i++) {
127        try {
128          this.region.increment(this.increment);
129          // LOG.info(getName() + " " + i);
130        } catch (IOException e) {
131          throw new RuntimeException(e);
132        }
133      }
134    }
135  }
136
137  /**
138   * Increments a random row's Cell <code>count</code> times.
139   */
140  private static class CrossRowCellIncrementer extends Thread {
141    private final int count;
142    private final HRegion region;
143    private final Increment [] increments;
144
145    CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
146      super("" + i);
147      setDaemon(true);
148      this.count = count;
149      this.region = region;
150      this.increments = new Increment[range];
151      for (int ii = 0; ii < range; ii++) {
152        this.increments[ii] = new Increment(Bytes.toBytes(i));
153        this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
154      }
155    }
156
157    @Override
158    public void run() {
159      for (int i = 0; i < this.count; i++) {
160        try {
161          int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
162          this.region.increment(this.increments[index]);
163          // LOG.info(getName() + " " + index);
164        } catch (IOException e) {
165          throw new RuntimeException(e);
166        }
167      }
168    }
169  }
170
171  /**
172   * Have each thread update its own Cell. Avoid contention with another thread.
173   * @throws IOException
174   * @throws InterruptedException
175   */
176  @Test
177  public void testUnContendedSingleCellIncrement()
178  throws IOException, InterruptedException {
179    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
180        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
181    long startTime = System.currentTimeMillis();
182    try {
183      SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
184      for (int i = 0; i < threads.length; i++) {
185        byte [] rowBytes = Bytes.toBytes(i);
186        Increment increment = new Increment(rowBytes);
187        increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
188        threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
189      }
190      for (int i = 0; i < threads.length; i++) {
191        threads[i].start();
192      }
193      for (int i = 0; i < threads.length; i++) {
194        threads[i].join();
195      }
196      RegionScanner regionScanner = region.getScanner(new Scan());
197      List<Cell> cells = new ArrayList<>(THREAD_COUNT);
198      while(regionScanner.next(cells)) continue;
199      assertEquals(THREAD_COUNT, cells.size());
200      long total = 0;
201      for (Cell cell: cells) total +=
202        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
203      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
204    } finally {
205      closeRegion(region);
206      LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
207    }
208  }
209
210  /**
211   * Have each thread update its own Cell. Avoid contention with another thread.
212   * This is
213   * @throws IOException
214   * @throws InterruptedException
215   */
216  @Test
217  public void testContendedAcrossCellsIncrement()
218  throws IOException, InterruptedException {
219    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
220        TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
221    long startTime = System.currentTimeMillis();
222    try {
223      CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
224      for (int i = 0; i < threads.length; i++) {
225        threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
226      }
227      for (int i = 0; i < threads.length; i++) {
228        threads[i].start();
229      }
230      for (int i = 0; i < threads.length; i++) {
231        threads[i].join();
232      }
233      RegionScanner regionScanner = region.getScanner(new Scan());
234      List<Cell> cells = new ArrayList<>(100);
235      while(regionScanner.next(cells)) continue;
236      assertEquals(THREAD_COUNT, cells.size());
237      long total = 0;
238      for (Cell cell: cells) total +=
239        Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
240      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
241    } finally {
242      closeRegion(region);
243      LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
244    }
245  }
246}