001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND; 021import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND; 022import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.HStore; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.regionserver.StoreEngine; 045import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 046import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.RegionServerTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058@Tag(RegionServerTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestCompactionWithThroughputController { 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestCompactionWithThroughputController.class); 064 065 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 066 067 private static final double EPSILON = 1E-6; 068 069 private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); 070 071 private final byte[] family = Bytes.toBytes("f"); 072 073 private final byte[] qualifier = Bytes.toBytes("q"); 074 075 private HStore getStoreWithName(TableName tableName) { 076 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 077 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 078 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 079 HRegionServer hrs = rsts.get(i).getRegionServer(); 080 for (Region region : hrs.getRegions(tableName)) { 081 return ((HRegion) region).getStores().iterator().next(); 082 } 083 } 084 return null; 085 } 086 087 private HStore prepareData() throws IOException { 088 Admin admin = TEST_UTIL.getAdmin(); 089 if (admin.tableExists(tableName)) { 090 admin.disableTable(tableName); 091 admin.deleteTable(tableName); 092 } 093 Table table = TEST_UTIL.createTable(tableName, family); 094 for (int i = 0; i < 10; i++) { 095 for (int j = 0; j < 10; j++) { 096 byte[] value = new byte[128 * 1024]; 097 Bytes.random(value); 098 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 099 } 100 admin.flush(tableName); 101 } 102 return getStoreWithName(tableName); 103 } 104 105 private long testCompactionWithThroughputLimit() throws Exception { 106 long throughputLimit = 1024L * 1024; 107 Configuration conf = TEST_UTIL.getConfiguration(); 108 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 109 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 110 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 111 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 112 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); 113 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); 114 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 115 PressureAwareCompactionThroughputController.class.getName()); 116 TEST_UTIL.startMiniCluster(1); 117 try { 118 HStore store = prepareData(); 119 assertEquals(10, store.getStorefilesCount()); 120 long startTime = EnvironmentEdgeManager.currentTime(); 121 TEST_UTIL.getAdmin().majorCompact(tableName); 122 while (store.getStorefilesCount() != 1) { 123 Thread.sleep(20); 124 } 125 long duration = EnvironmentEdgeManager.currentTime() - startTime; 126 double throughput = (double) store.getStorefilesSize() / duration * 1000; 127 // confirm that the speed limit work properly(not too fast, and also not too slow) 128 // 20% is the max acceptable error rate. 129 assertTrue(throughput < throughputLimit * 1.2); 130 assertTrue(throughput > throughputLimit * 0.8); 131 return EnvironmentEdgeManager.currentTime() - startTime; 132 } finally { 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 } 136 137 private long testCompactionWithoutThroughputLimit() throws Exception { 138 Configuration conf = TEST_UTIL.getConfiguration(); 139 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 140 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 141 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 142 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 143 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 144 NoLimitThroughputController.class.getName()); 145 TEST_UTIL.startMiniCluster(1); 146 try { 147 HStore store = prepareData(); 148 assertEquals(10, store.getStorefilesCount()); 149 long startTime = EnvironmentEdgeManager.currentTime(); 150 TEST_UTIL.getAdmin().majorCompact(tableName); 151 while (store.getStorefilesCount() != 1) { 152 Thread.sleep(20); 153 } 154 return EnvironmentEdgeManager.currentTime() - startTime; 155 } finally { 156 TEST_UTIL.shutdownMiniCluster(); 157 } 158 } 159 160 @Test 161 public void testCompaction() throws Exception { 162 long limitTime = testCompactionWithThroughputLimit(); 163 long noLimitTime = testCompactionWithoutThroughputLimit(); 164 LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " 165 + noLimitTime + "ms"); 166 // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this 167 // is a very weak assumption. 168 assertTrue(limitTime > noLimitTime * 2); 169 } 170 171 /** 172 * Test the tuning task of {@link PressureAwareCompactionThroughputController} 173 */ 174 @Test 175 public void testThroughputTuning() throws Exception { 176 Configuration conf = TEST_UTIL.getConfiguration(); 177 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 178 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024); 179 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024); 180 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); 181 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); 182 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 183 PressureAwareCompactionThroughputController.class.getName()); 184 conf.setInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000); 185 TEST_UTIL.startMiniCluster(1); 186 Connection conn = ConnectionFactory.createConnection(conf); 187 try { 188 TEST_UTIL.getAdmin() 189 .createTable(TableDescriptorBuilder.newBuilder(tableName) 190 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 191 .build()); 192 TEST_UTIL.waitTableAvailable(tableName); 193 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 194 PressureAwareCompactionThroughputController throughputController = 195 (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread() 196 .getCompactionThroughputController(); 197 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 198 Table table = conn.getTable(tableName); 199 for (int i = 0; i < 5; i++) { 200 byte[] value = new byte[0]; 201 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value)); 202 TEST_UTIL.flush(tableName); 203 } 204 Thread.sleep(2000); 205 assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 206 207 byte[] value1 = new byte[0]; 208 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); 209 TEST_UTIL.flush(tableName); 210 Thread.sleep(2000); 211 assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 212 213 byte[] value = new byte[0]; 214 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); 215 TEST_UTIL.flush(tableName); 216 Thread.sleep(2000); 217 assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON); 218 219 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 220 NoLimitThroughputController.class.getName()); 221 regionServer.getCompactSplitThread().onConfigurationChange(conf); 222 assertTrue(throughputController.isStopped()); 223 assertTrue(regionServer.getCompactSplitThread() 224 .getCompactionThroughputController() instanceof NoLimitThroughputController); 225 } finally { 226 conn.close(); 227 TEST_UTIL.shutdownMiniCluster(); 228 } 229 } 230 231 /** 232 * Test the logic that we calculate compaction pressure for a striped store. 233 */ 234 @Test 235 public void testGetCompactionPressureForStripedStore() throws Exception { 236 Configuration conf = TEST_UTIL.getConfiguration(); 237 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 238 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); 239 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); 240 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 241 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); 242 TEST_UTIL.startMiniCluster(1); 243 Connection conn = ConnectionFactory.createConnection(conf); 244 try { 245 TEST_UTIL.getAdmin() 246 .createTable(TableDescriptorBuilder.newBuilder(tableName) 247 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 248 .build()); 249 TEST_UTIL.waitTableAvailable(tableName); 250 HStore store = getStoreWithName(tableName); 251 assertEquals(0, store.getStorefilesCount()); 252 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 253 Table table = conn.getTable(tableName); 254 for (int i = 0; i < 4; i++) { 255 byte[] value1 = new byte[0]; 256 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1)); 257 byte[] value = new byte[0]; 258 table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value)); 259 TEST_UTIL.flush(tableName); 260 } 261 assertEquals(8, store.getStorefilesCount()); 262 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 263 264 byte[] value5 = new byte[0]; 265 table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5)); 266 byte[] value4 = new byte[0]; 267 table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4)); 268 TEST_UTIL.flush(tableName); 269 assertEquals(10, store.getStorefilesCount()); 270 assertEquals(0.5, store.getCompactionPressure(), EPSILON); 271 272 byte[] value3 = new byte[0]; 273 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3)); 274 byte[] value2 = new byte[0]; 275 table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2)); 276 TEST_UTIL.flush(tableName); 277 assertEquals(12, store.getStorefilesCount()); 278 assertEquals(1.0, store.getCompactionPressure(), EPSILON); 279 280 byte[] value1 = new byte[0]; 281 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1)); 282 byte[] value = new byte[0]; 283 table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value)); 284 TEST_UTIL.flush(tableName); 285 assertEquals(14, store.getStorefilesCount()); 286 assertEquals(2.0, store.getCompactionPressure(), EPSILON); 287 } finally { 288 conn.close(); 289 TEST_UTIL.shutdownMiniCluster(); 290 } 291 } 292}