001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND;
021import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND;
022import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
040import org.apache.hadoop.hbase.regionserver.HRegion;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.HStore;
043import org.apache.hadoop.hbase.regionserver.Region;
044import org.apache.hadoop.hbase.regionserver.StoreEngine;
045import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
046import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.RegionServerTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058@Tag(RegionServerTests.TAG)
059@Tag(LargeTests.TAG)
060public class TestCompactionWithThroughputController {
061
062  private static final Logger LOG =
063    LoggerFactory.getLogger(TestCompactionWithThroughputController.class);
064
065  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
066
067  private static final double EPSILON = 1E-6;
068
069  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
070
071  private final byte[] family = Bytes.toBytes("f");
072
073  private final byte[] qualifier = Bytes.toBytes("q");
074
075  private HStore getStoreWithName(TableName tableName) {
076    SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
077    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
078    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
079      HRegionServer hrs = rsts.get(i).getRegionServer();
080      for (Region region : hrs.getRegions(tableName)) {
081        return ((HRegion) region).getStores().iterator().next();
082      }
083    }
084    return null;
085  }
086
087  private HStore prepareData() throws IOException {
088    Admin admin = TEST_UTIL.getAdmin();
089    if (admin.tableExists(tableName)) {
090      admin.disableTable(tableName);
091      admin.deleteTable(tableName);
092    }
093    Table table = TEST_UTIL.createTable(tableName, family);
094    for (int i = 0; i < 10; i++) {
095      for (int j = 0; j < 10; j++) {
096        byte[] value = new byte[128 * 1024];
097        Bytes.random(value);
098        table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
099      }
100      admin.flush(tableName);
101    }
102    return getStoreWithName(tableName);
103  }
104
105  private long testCompactionWithThroughputLimit() throws Exception {
106    long throughputLimit = 1024L * 1024;
107    Configuration conf = TEST_UTIL.getConfiguration();
108    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
109    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
110    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
111    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
112    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit);
113    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit);
114    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
115      PressureAwareCompactionThroughputController.class.getName());
116    TEST_UTIL.startMiniCluster(1);
117    try {
118      HStore store = prepareData();
119      assertEquals(10, store.getStorefilesCount());
120      long startTime = EnvironmentEdgeManager.currentTime();
121      TEST_UTIL.getAdmin().majorCompact(tableName);
122      while (store.getStorefilesCount() != 1) {
123        Thread.sleep(20);
124      }
125      long duration = EnvironmentEdgeManager.currentTime() - startTime;
126      double throughput = (double) store.getStorefilesSize() / duration * 1000;
127      // confirm that the speed limit work properly(not too fast, and also not too slow)
128      // 20% is the max acceptable error rate.
129      assertTrue(throughput < throughputLimit * 1.2);
130      assertTrue(throughput > throughputLimit * 0.8);
131      return EnvironmentEdgeManager.currentTime() - startTime;
132    } finally {
133      TEST_UTIL.shutdownMiniCluster();
134    }
135  }
136
137  private long testCompactionWithoutThroughputLimit() throws Exception {
138    Configuration conf = TEST_UTIL.getConfiguration();
139    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
140    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
141    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
142    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
143    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
144      NoLimitThroughputController.class.getName());
145    TEST_UTIL.startMiniCluster(1);
146    try {
147      HStore store = prepareData();
148      assertEquals(10, store.getStorefilesCount());
149      long startTime = EnvironmentEdgeManager.currentTime();
150      TEST_UTIL.getAdmin().majorCompact(tableName);
151      while (store.getStorefilesCount() != 1) {
152        Thread.sleep(20);
153      }
154      return EnvironmentEdgeManager.currentTime() - startTime;
155    } finally {
156      TEST_UTIL.shutdownMiniCluster();
157    }
158  }
159
160  @Test
161  public void testCompaction() throws Exception {
162    long limitTime = testCompactionWithThroughputLimit();
163    long noLimitTime = testCompactionWithoutThroughputLimit();
164    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
165      + noLimitTime + "ms");
166    // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
167    // is a very weak assumption.
168    assertTrue(limitTime > noLimitTime * 2);
169  }
170
171  /**
172   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
173   */
174  @Test
175  public void testThroughputTuning() throws Exception {
176    Configuration conf = TEST_UTIL.getConfiguration();
177    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
178    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024);
179    conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024);
180    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
181    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
182    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
183      PressureAwareCompactionThroughputController.class.getName());
184    conf.setInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000);
185    TEST_UTIL.startMiniCluster(1);
186    Connection conn = ConnectionFactory.createConnection(conf);
187    try {
188      TEST_UTIL.getAdmin()
189        .createTable(TableDescriptorBuilder.newBuilder(tableName)
190          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
191          .build());
192      TEST_UTIL.waitTableAvailable(tableName);
193      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
194      PressureAwareCompactionThroughputController throughputController =
195        (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
196          .getCompactionThroughputController();
197      assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
198      Table table = conn.getTable(tableName);
199      for (int i = 0; i < 5; i++) {
200        byte[] value = new byte[0];
201        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
202        TEST_UTIL.flush(tableName);
203      }
204      Thread.sleep(2000);
205      assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
206
207      byte[] value1 = new byte[0];
208      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
209      TEST_UTIL.flush(tableName);
210      Thread.sleep(2000);
211      assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
212
213      byte[] value = new byte[0];
214      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
215      TEST_UTIL.flush(tableName);
216      Thread.sleep(2000);
217      assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
218
219      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
220        NoLimitThroughputController.class.getName());
221      regionServer.getCompactSplitThread().onConfigurationChange(conf);
222      assertTrue(throughputController.isStopped());
223      assertTrue(regionServer.getCompactSplitThread()
224        .getCompactionThroughputController() instanceof NoLimitThroughputController);
225    } finally {
226      conn.close();
227      TEST_UTIL.shutdownMiniCluster();
228    }
229  }
230
231  /**
232   * Test the logic that we calculate compaction pressure for a striped store.
233   */
234  @Test
235  public void testGetCompactionPressureForStripedStore() throws Exception {
236    Configuration conf = TEST_UTIL.getConfiguration();
237    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
238    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
239    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
240    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
241    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
242    TEST_UTIL.startMiniCluster(1);
243    Connection conn = ConnectionFactory.createConnection(conf);
244    try {
245      TEST_UTIL.getAdmin()
246        .createTable(TableDescriptorBuilder.newBuilder(tableName)
247          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
248          .build());
249      TEST_UTIL.waitTableAvailable(tableName);
250      HStore store = getStoreWithName(tableName);
251      assertEquals(0, store.getStorefilesCount());
252      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
253      Table table = conn.getTable(tableName);
254      for (int i = 0; i < 4; i++) {
255        byte[] value1 = new byte[0];
256        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
257        byte[] value = new byte[0];
258        table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
259        TEST_UTIL.flush(tableName);
260      }
261      assertEquals(8, store.getStorefilesCount());
262      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
263
264      byte[] value5 = new byte[0];
265      table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
266      byte[] value4 = new byte[0];
267      table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
268      TEST_UTIL.flush(tableName);
269      assertEquals(10, store.getStorefilesCount());
270      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
271
272      byte[] value3 = new byte[0];
273      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
274      byte[] value2 = new byte[0];
275      table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
276      TEST_UTIL.flush(tableName);
277      assertEquals(12, store.getStorefilesCount());
278      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
279
280      byte[] value1 = new byte[0];
281      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
282      byte[] value = new byte[0];
283      table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
284      TEST_UTIL.flush(tableName);
285      assertEquals(14, store.getStorefilesCount());
286      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
287    } finally {
288      conn.close();
289      TEST_UTIL.shutdownMiniCluster();
290    }
291  }
292}