001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.MiniHBaseCluster;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.regionserver.HStore;
042import org.apache.hadoop.hbase.regionserver.Region;
043import org.apache.hadoop.hbase.regionserver.StoreEngine;
044import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
045import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.JVMClusterUtil;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({ RegionServerTests.class, MediumTests.class })
058public class TestCompactionWithThroughputController {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestCompactionWithThroughputController.class);
063
064  private static final Logger LOG =
065      LoggerFactory.getLogger(TestCompactionWithThroughputController.class);
066
067  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
068
069  private static final double EPSILON = 1E-6;
070
071  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
072
073  private final byte[] family = Bytes.toBytes("f");
074
075  private final byte[] qualifier = Bytes.toBytes("q");
076
077  private HStore getStoreWithName(TableName tableName) {
078    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
079    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
080    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
081      HRegionServer hrs = rsts.get(i).getRegionServer();
082      for (Region region : hrs.getRegions(tableName)) {
083        return ((HRegion) region).getStores().iterator().next();
084      }
085    }
086    return null;
087  }
088
089  private HStore prepareData() throws IOException {
090    Admin admin = TEST_UTIL.getAdmin();
091    if (admin.tableExists(tableName)) {
092      admin.disableTable(tableName);
093      admin.deleteTable(tableName);
094    }
095    Table table = TEST_UTIL.createTable(tableName, family);
096    for (int i = 0; i < 10; i++) {
097      for (int j = 0; j < 10; j++) {
098        byte[] value = new byte[128 * 1024];
099        ThreadLocalRandom.current().nextBytes(value);
100        table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
101      }
102      admin.flush(tableName);
103    }
104    return getStoreWithName(tableName);
105  }
106
107  private long testCompactionWithThroughputLimit() throws Exception {
108    long throughputLimit = 1024L * 1024;
109    Configuration conf = TEST_UTIL.getConfiguration();
110    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
111    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
112    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
113    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
114    conf.setLong(
115      PressureAwareCompactionThroughputController
116        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
117      throughputLimit);
118    conf.setLong(
119      PressureAwareCompactionThroughputController
120        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
121      throughputLimit);
122    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
123      PressureAwareCompactionThroughputController.class.getName());
124    TEST_UTIL.startMiniCluster(1);
125    try {
126      HStore store = prepareData();
127      assertEquals(10, store.getStorefilesCount());
128      long startTime = System.currentTimeMillis();
129      TEST_UTIL.getAdmin().majorCompact(tableName);
130      while (store.getStorefilesCount() != 1) {
131        Thread.sleep(20);
132      }
133      long duration = System.currentTimeMillis() - startTime;
134      double throughput = (double) store.getStorefilesSize() / duration * 1000;
135      // confirm that the speed limit work properly(not too fast, and also not too slow)
136      // 20% is the max acceptable error rate.
137      assertTrue(throughput < throughputLimit * 1.2);
138      assertTrue(throughput > throughputLimit * 0.8);
139      return System.currentTimeMillis() - startTime;
140    } finally {
141      TEST_UTIL.shutdownMiniCluster();
142    }
143  }
144
145  private long testCompactionWithoutThroughputLimit() throws Exception {
146    Configuration conf = TEST_UTIL.getConfiguration();
147    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
148    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
149    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
150    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
151    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
152      NoLimitThroughputController.class.getName());
153    TEST_UTIL.startMiniCluster(1);
154    try {
155      HStore store = prepareData();
156      assertEquals(10, store.getStorefilesCount());
157      long startTime = System.currentTimeMillis();
158      TEST_UTIL.getAdmin().majorCompact(tableName);
159      while (store.getStorefilesCount() != 1) {
160        Thread.sleep(20);
161      }
162      return System.currentTimeMillis() - startTime;
163    } finally {
164      TEST_UTIL.shutdownMiniCluster();
165    }
166  }
167
168  @Test
169  public void testCompaction() throws Exception {
170    long limitTime = testCompactionWithThroughputLimit();
171    long noLimitTime = testCompactionWithoutThroughputLimit();
172    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
173        + noLimitTime + "ms");
174    // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
175    // is a very weak assumption.
176    assertTrue(limitTime > noLimitTime * 2);
177  }
178
179  /**
180   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
181   */
182  @Test
183  public void testThroughputTuning() throws Exception {
184    Configuration conf = TEST_UTIL.getConfiguration();
185    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
186    conf.setLong(
187      PressureAwareCompactionThroughputController
188        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
189      20L * 1024 * 1024);
190    conf.setLong(
191      PressureAwareCompactionThroughputController
192        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
193      10L * 1024 * 1024);
194    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
195    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
196    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
197      PressureAwareCompactionThroughputController.class.getName());
198    conf.setInt(
199      PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
200      1000);
201    TEST_UTIL.startMiniCluster(1);
202    Connection conn = ConnectionFactory.createConnection(conf);
203    try {
204      TEST_UTIL.getAdmin()
205          .createTable(TableDescriptorBuilder.newBuilder(tableName)
206              .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
207              .build());
208      TEST_UTIL.waitTableAvailable(tableName);
209      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
210      PressureAwareCompactionThroughputController throughputController =
211          (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
212              .getCompactionThroughputController();
213      assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
214      Table table = conn.getTable(tableName);
215      for (int i = 0; i < 5; i++) {
216        byte[] value = new byte[0];
217        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
218        TEST_UTIL.flush(tableName);
219      }
220      Thread.sleep(2000);
221      assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
222
223      byte[] value1 = new byte[0];
224      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
225      TEST_UTIL.flush(tableName);
226      Thread.sleep(2000);
227      assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
228
229      byte[] value = new byte[0];
230      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
231      TEST_UTIL.flush(tableName);
232      Thread.sleep(2000);
233      assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
234
235      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
236        NoLimitThroughputController.class.getName());
237      regionServer.compactSplitThread.onConfigurationChange(conf);
238      assertTrue(throughputController.isStopped());
239      assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
240        instanceof NoLimitThroughputController);
241    } finally {
242      conn.close();
243      TEST_UTIL.shutdownMiniCluster();
244    }
245  }
246
247  /**
248   * Test the logic that we calculate compaction pressure for a striped store.
249   */
250  @Test
251  public void testGetCompactionPressureForStripedStore() throws Exception {
252    Configuration conf = TEST_UTIL.getConfiguration();
253    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
254    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
255    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
256    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
257    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
258    TEST_UTIL.startMiniCluster(1);
259    Connection conn = ConnectionFactory.createConnection(conf);
260    try {
261      TEST_UTIL.getAdmin()
262          .createTable(TableDescriptorBuilder.newBuilder(tableName)
263              .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
264              .build());
265      TEST_UTIL.waitTableAvailable(tableName);
266      HStore store = getStoreWithName(tableName);
267      assertEquals(0, store.getStorefilesCount());
268      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
269      Table table = conn.getTable(tableName);
270      for (int i = 0; i < 4; i++) {
271        byte[] value1 = new byte[0];
272        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
273        byte[] value = new byte[0];
274        table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
275        TEST_UTIL.flush(tableName);
276      }
277      assertEquals(8, store.getStorefilesCount());
278      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
279
280      byte[] value5 = new byte[0];
281      table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
282      byte[] value4 = new byte[0];
283      table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
284      TEST_UTIL.flush(tableName);
285      assertEquals(10, store.getStorefilesCount());
286      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
287
288      byte[] value3 = new byte[0];
289      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
290      byte[] value2 = new byte[0];
291      table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
292      TEST_UTIL.flush(tableName);
293      assertEquals(12, store.getStorefilesCount());
294      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
295
296      byte[] value1 = new byte[0];
297      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
298      byte[] value = new byte[0];
299      table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
300      TEST_UTIL.flush(tableName);
301      assertEquals(14, store.getStorefilesCount());
302      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
303    } finally {
304      conn.close();
305      TEST_UTIL.shutdownMiniCluster();
306    }
307  }
308}