001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND;
021import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND;
022import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.MiniHBaseCluster;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.hadoop.hbase.regionserver.HRegionServer;
043import org.apache.hadoop.hbase.regionserver.HStore;
044import org.apache.hadoop.hbase.regionserver.Region;
045import org.apache.hadoop.hbase.regionserver.StoreEngine;
046import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
047import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({ RegionServerTests.class, LargeTests.class })
061public class TestCompactionWithThroughputController {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestCompactionWithThroughputController.class);
066
067  private static final Logger LOG =
068    LoggerFactory.getLogger(TestCompactionWithThroughputController.class);
069
070  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
071
072  private static final double EPSILON = 1E-6;
073
074  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
075
076  private final byte[] family = Bytes.toBytes("f");
077
078  private final byte[] qualifier = Bytes.toBytes("q");
079
080  private HStore getStoreWithName(TableName tableName) {
081    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
082    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
083    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
084      HRegionServer hrs = rsts.get(i).getRegionServer();
085      for (Region region : hrs.getRegions(tableName)) {
086        return ((HRegion) region).getStores().iterator().next();
087      }
088    }
089    return null;
090  }
091
092  private HStore prepareData() throws IOException {
093    Admin admin = TEST_UTIL.getAdmin();
094    if (admin.tableExists(tableName)) {
095      admin.disableTable(tableName);
096      admin.deleteTable(tableName);
097    }
098    Table table = TEST_UTIL.createTable(tableName, family);
099    for (int i = 0; i < 10; i++) {
100      for (int j = 0; j < 10; j++) {
101        byte[] value = new byte[128 * 1024];
102        Bytes.random(value);
103        table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
104      }
105      admin.flush(tableName);
106    }
107    return getStoreWithName(tableName);
108  }
109
110  private long testCompactionWithThroughputLimit() throws Exception {
111    long throughputLimit = 1024L * 1024;
112    Configuration conf = TEST_UTIL.getConfiguration();
113    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
114    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
115    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
116    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
117    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit);
118    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit);
119    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
120      PressureAwareCompactionThroughputController.class.getName());
121    TEST_UTIL.startMiniCluster(1);
122    try {
123      HStore store = prepareData();
124      assertEquals(10, store.getStorefilesCount());
125      long startTime = EnvironmentEdgeManager.currentTime();
126      TEST_UTIL.getAdmin().majorCompact(tableName);
127      while (store.getStorefilesCount() != 1) {
128        Thread.sleep(20);
129      }
130      long duration = EnvironmentEdgeManager.currentTime() - startTime;
131      double throughput = (double) store.getStorefilesSize() / duration * 1000;
132      // confirm that the speed limit work properly(not too fast, and also not too slow)
133      // 20% is the max acceptable error rate.
134      assertTrue(throughput < throughputLimit * 1.2);
135      assertTrue(throughput > throughputLimit * 0.8);
136      return EnvironmentEdgeManager.currentTime() - startTime;
137    } finally {
138      TEST_UTIL.shutdownMiniCluster();
139    }
140  }
141
142  private long testCompactionWithoutThroughputLimit() throws Exception {
143    Configuration conf = TEST_UTIL.getConfiguration();
144    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
145    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
146    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
147    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
148    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
149      NoLimitThroughputController.class.getName());
150    TEST_UTIL.startMiniCluster(1);
151    try {
152      HStore store = prepareData();
153      assertEquals(10, store.getStorefilesCount());
154      long startTime = EnvironmentEdgeManager.currentTime();
155      TEST_UTIL.getAdmin().majorCompact(tableName);
156      while (store.getStorefilesCount() != 1) {
157        Thread.sleep(20);
158      }
159      return EnvironmentEdgeManager.currentTime() - startTime;
160    } finally {
161      TEST_UTIL.shutdownMiniCluster();
162    }
163  }
164
165  @Test
166  public void testCompaction() throws Exception {
167    long limitTime = testCompactionWithThroughputLimit();
168    long noLimitTime = testCompactionWithoutThroughputLimit();
169    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
170      + noLimitTime + "ms");
171    // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
172    // is a very weak assumption.
173    assertTrue(limitTime > noLimitTime * 2);
174  }
175
176  /**
177   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
178   */
179  @Test
180  public void testThroughputTuning() throws Exception {
181    Configuration conf = TEST_UTIL.getConfiguration();
182    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
183    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024);
184    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024);
185    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
186    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
187    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
188      PressureAwareCompactionThroughputController.class.getName());
189    conf.setInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000);
190    TEST_UTIL.startMiniCluster(1);
191    Connection conn = ConnectionFactory.createConnection(conf);
192    try {
193      TEST_UTIL.getAdmin()
194        .createTable(TableDescriptorBuilder.newBuilder(tableName)
195          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
196          .build());
197      TEST_UTIL.waitTableAvailable(tableName);
198      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
199      PressureAwareCompactionThroughputController throughputController =
200        (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
201          .getCompactionThroughputController();
202      assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
203      Table table = conn.getTable(tableName);
204      for (int i = 0; i < 5; i++) {
205        byte[] value = new byte[0];
206        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
207        TEST_UTIL.flush(tableName);
208      }
209      Thread.sleep(2000);
210      assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
211
212      byte[] value1 = new byte[0];
213      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
214      TEST_UTIL.flush(tableName);
215      Thread.sleep(2000);
216      assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
217
218      byte[] value = new byte[0];
219      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
220      TEST_UTIL.flush(tableName);
221      Thread.sleep(2000);
222      assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
223
224      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
225        NoLimitThroughputController.class.getName());
226      regionServer.getCompactSplitThread().onConfigurationChange(conf);
227      assertTrue(throughputController.isStopped());
228      assertTrue(regionServer.getCompactSplitThread()
229        .getCompactionThroughputController() instanceof NoLimitThroughputController);
230    } finally {
231      conn.close();
232      TEST_UTIL.shutdownMiniCluster();
233    }
234  }
235
236  /**
237   * Test the logic that we calculate compaction pressure for a striped store.
238   */
239  @Test
240  public void testGetCompactionPressureForStripedStore() throws Exception {
241    Configuration conf = TEST_UTIL.getConfiguration();
242    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
243    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
244    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
245    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
246    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
247    TEST_UTIL.startMiniCluster(1);
248    Connection conn = ConnectionFactory.createConnection(conf);
249    try {
250      TEST_UTIL.getAdmin()
251        .createTable(TableDescriptorBuilder.newBuilder(tableName)
252          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
253          .build());
254      TEST_UTIL.waitTableAvailable(tableName);
255      HStore store = getStoreWithName(tableName);
256      assertEquals(0, store.getStorefilesCount());
257      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
258      Table table = conn.getTable(tableName);
259      for (int i = 0; i < 4; i++) {
260        byte[] value1 = new byte[0];
261        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
262        byte[] value = new byte[0];
263        table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
264        TEST_UTIL.flush(tableName);
265      }
266      assertEquals(8, store.getStorefilesCount());
267      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
268
269      byte[] value5 = new byte[0];
270      table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
271      byte[] value4 = new byte[0];
272      table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
273      TEST_UTIL.flush(tableName);
274      assertEquals(10, store.getStorefilesCount());
275      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
276
277      byte[] value3 = new byte[0];
278      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
279      byte[] value2 = new byte[0];
280      table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
281      TEST_UTIL.flush(tableName);
282      assertEquals(12, store.getStorefilesCount());
283      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
284
285      byte[] value1 = new byte[0];
286      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
287      byte[] value = new byte[0];
288      table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
289      TEST_UTIL.flush(tableName);
290      assertEquals(14, store.getStorefilesCount());
291      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
292    } finally {
293      conn.close();
294      TEST_UTIL.shutdownMiniCluster();
295    }
296  }
297}