001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.io.UncheckedIOException;
025import java.util.stream.IntStream;
026import org.apache.hadoop.hbase.HBaseTestingUtility;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Get;
030import org.apache.hadoop.hbase.client.Increment;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.junit.AfterClass;
036import org.junit.BeforeClass;
037
038public class WriteHeavyIncrementObserverTestBase {
039
040  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
041
042  protected static TableName NAME = TableName.valueOf("TestCP");
043
044  protected static byte[] FAMILY = Bytes.toBytes("cf");
045
046  protected static byte[] ROW = Bytes.toBytes("row");
047
048  protected static byte[] CQ1 = Bytes.toBytes("cq1");
049
050  protected static byte[] CQ2 = Bytes.toBytes("cq2");
051
052  protected static Table TABLE;
053
054  protected static long UPPER = 1000;
055
056  protected static int THREADS = 10;
057
058  @BeforeClass
059  public static void setUp() throws Exception {
060    UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
061    UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
062    UTIL.getConfiguration().setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
063      0.014);
064    UTIL.startMiniCluster(3);
065  }
066
067  @AfterClass
068  public static void tearDown() throws Exception {
069    if (TABLE != null) {
070      TABLE.close();
071    }
072    UTIL.shutdownMiniCluster();
073  }
074
075  private static void increment(int sleepSteps) throws IOException {
076    for (long i = 1; i <= UPPER; i++) {
077      TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
078      if (sleepSteps > 0 && i % sleepSteps == 0) {
079        try {
080          Thread.sleep(10);
081        } catch (InterruptedException e) {
082          throw (IOException) new InterruptedIOException().initCause(e);
083        }
084      }
085    }
086  }
087
088  protected final void assertSum() throws IOException {
089    Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
090    assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
091    assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
092  }
093
094  protected final void doIncrement(int sleepSteps) throws InterruptedException {
095    Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
096      try {
097        increment(sleepSteps);
098      } catch (IOException e) {
099        throw new UncheckedIOException(e);
100      }
101    }, "increment-" + i)).toArray(Thread[]::new);
102    for (Thread thread : threads) {
103      thread.start();
104    }
105    for (Thread thread : threads) {
106      thread.join();
107    }
108  }
109}