001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.MiniHBaseCluster; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.StoreEngine; 044import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 045import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.JVMClusterUtil; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({ RegionServerTests.class, LargeTests.class }) 058public class TestCompactionWithThroughputController { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestCompactionWithThroughputController.class); 063 064 private static final Logger LOG = 065 LoggerFactory.getLogger(TestCompactionWithThroughputController.class); 066 067 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 068 069 private static final double EPSILON = 1E-6; 070 071 private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); 072 073 private final byte[] family = Bytes.toBytes("f"); 074 075 private final byte[] qualifier = Bytes.toBytes("q"); 076 077 private HStore getStoreWithName(TableName tableName) { 078 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 079 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 080 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 081 HRegionServer hrs = rsts.get(i).getRegionServer(); 082 for (Region region : hrs.getRegions(tableName)) { 083 return ((HRegion) region).getStores().iterator().next(); 084 } 085 } 086 return null; 087 } 088 089 private HStore prepareData() throws IOException { 090 Admin admin = TEST_UTIL.getAdmin(); 091 if (admin.tableExists(tableName)) { 092 admin.disableTable(tableName); 093 admin.deleteTable(tableName); 094 } 095 Table table = TEST_UTIL.createTable(tableName, family); 096 for (int i = 0; i < 10; i++) { 097 for (int j = 0; j < 10; j++) { 098 byte[] value = new byte[128 * 1024]; 099 ThreadLocalRandom.current().nextBytes(value); 100 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 101 } 102 admin.flush(tableName); 103 } 104 return getStoreWithName(tableName); 105 } 106 107 private long testCompactionWithThroughputLimit() throws Exception { 108 long throughputLimit = 1024L * 1024; 109 Configuration conf = TEST_UTIL.getConfiguration(); 110 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 111 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 112 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 113 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 114 conf.setLong( 115 PressureAwareCompactionThroughputController 116 .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 117 throughputLimit); 118 conf.setLong( 119 PressureAwareCompactionThroughputController 120 .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 121 throughputLimit); 122 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 123 PressureAwareCompactionThroughputController.class.getName()); 124 TEST_UTIL.startMiniCluster(1); 125 try { 126 HStore store = prepareData(); 127 assertEquals(10, store.getStorefilesCount()); 128 long startTime = System.currentTimeMillis(); 129 TEST_UTIL.getAdmin().majorCompact(tableName); 130 while (store.getStorefilesCount() != 1) { 131 Thread.sleep(20); 132 } 133 long duration = System.currentTimeMillis() - startTime; 134 double throughput = (double) store.getStorefilesSize() / duration * 1000; 135 // confirm that the speed limit work properly(not too fast, and also not too slow) 136 // 20% is the max acceptable error rate. 137 assertTrue(throughput < throughputLimit * 1.2); 138 assertTrue(throughput > throughputLimit * 0.8); 139 return System.currentTimeMillis() - startTime; 140 } finally { 141 TEST_UTIL.shutdownMiniCluster(); 142 } 143 } 144 145 private long testCompactionWithoutThroughputLimit() throws Exception { 146 Configuration conf = TEST_UTIL.getConfiguration(); 147 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 148 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 149 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 150 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 151 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 152 NoLimitThroughputController.class.getName()); 153 TEST_UTIL.startMiniCluster(1); 154 try { 155 HStore store = prepareData(); 156 assertEquals(10, store.getStorefilesCount()); 157 long startTime = System.currentTimeMillis(); 158 TEST_UTIL.getAdmin().majorCompact(tableName); 159 while (store.getStorefilesCount() != 1) { 160 Thread.sleep(20); 161 } 162 return System.currentTimeMillis() - startTime; 163 } finally { 164 TEST_UTIL.shutdownMiniCluster(); 165 } 166 } 167 168 @Test 169 public void testCompaction() throws Exception { 170 long limitTime = testCompactionWithThroughputLimit(); 171 long noLimitTime = testCompactionWithoutThroughputLimit(); 172 LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " 173 + noLimitTime + "ms"); 174 // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this 175 // is a very weak assumption. 176 assertTrue(limitTime > noLimitTime * 2); 177 } 178 179 /** 180 * Test the tuning task of {@link PressureAwareCompactionThroughputController} 181 */ 182 @Test 183 public void testThroughputTuning() throws Exception { 184 Configuration conf = TEST_UTIL.getConfiguration(); 185 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 186 conf.setLong( 187 PressureAwareCompactionThroughputController 188 .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 189 20L * 1024 * 1024); 190 conf.setLong( 191 PressureAwareCompactionThroughputController 192 .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 193 10L * 1024 * 1024); 194 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); 195 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); 196 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 197 PressureAwareCompactionThroughputController.class.getName()); 198 conf.setInt( 199 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 200 1000); 201 TEST_UTIL.startMiniCluster(1); 202 Connection conn = ConnectionFactory.createConnection(conf); 203 try { 204 TEST_UTIL.getAdmin() 205 .createTable(TableDescriptorBuilder.newBuilder(tableName) 206 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 207 .build()); 208 TEST_UTIL.waitTableAvailable(tableName); 209 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 210 PressureAwareCompactionThroughputController throughputController = 211 (PressureAwareCompactionThroughputController) regionServer.compactSplitThread 212 .getCompactionThroughputController(); 213 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 214 Table table = conn.getTable(tableName); 215 for (int i = 0; i < 5; i++) { 216 byte[] value = new byte[0]; 217 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value)); 218 TEST_UTIL.flush(tableName); 219 } 220 Thread.sleep(2000); 221 assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 222 223 byte[] value1 = new byte[0]; 224 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); 225 TEST_UTIL.flush(tableName); 226 Thread.sleep(2000); 227 assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 228 229 byte[] value = new byte[0]; 230 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); 231 TEST_UTIL.flush(tableName); 232 Thread.sleep(2000); 233 assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON); 234 235 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 236 NoLimitThroughputController.class.getName()); 237 regionServer.compactSplitThread.onConfigurationChange(conf); 238 assertTrue(throughputController.isStopped()); 239 assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() 240 instanceof NoLimitThroughputController); 241 } finally { 242 conn.close(); 243 TEST_UTIL.shutdownMiniCluster(); 244 } 245 } 246 247 /** 248 * Test the logic that we calculate compaction pressure for a striped store. 249 */ 250 @Test 251 public void testGetCompactionPressureForStripedStore() throws Exception { 252 Configuration conf = TEST_UTIL.getConfiguration(); 253 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 254 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); 255 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); 256 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 257 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); 258 TEST_UTIL.startMiniCluster(1); 259 Connection conn = ConnectionFactory.createConnection(conf); 260 try { 261 TEST_UTIL.getAdmin() 262 .createTable(TableDescriptorBuilder.newBuilder(tableName) 263 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 264 .build()); 265 TEST_UTIL.waitTableAvailable(tableName); 266 HStore store = getStoreWithName(tableName); 267 assertEquals(0, store.getStorefilesCount()); 268 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 269 Table table = conn.getTable(tableName); 270 for (int i = 0; i < 4; i++) { 271 byte[] value1 = new byte[0]; 272 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1)); 273 byte[] value = new byte[0]; 274 table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value)); 275 TEST_UTIL.flush(tableName); 276 } 277 assertEquals(8, store.getStorefilesCount()); 278 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 279 280 byte[] value5 = new byte[0]; 281 table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5)); 282 byte[] value4 = new byte[0]; 283 table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4)); 284 TEST_UTIL.flush(tableName); 285 assertEquals(10, store.getStorefilesCount()); 286 assertEquals(0.5, store.getCompactionPressure(), EPSILON); 287 288 byte[] value3 = new byte[0]; 289 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3)); 290 byte[] value2 = new byte[0]; 291 table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2)); 292 TEST_UTIL.flush(tableName); 293 assertEquals(12, store.getStorefilesCount()); 294 assertEquals(1.0, store.getCompactionPressure(), EPSILON); 295 296 byte[] value1 = new byte[0]; 297 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1)); 298 byte[] value = new byte[0]; 299 table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value)); 300 TEST_UTIL.flush(tableName); 301 assertEquals(14, store.getStorefilesCount()); 302 assertEquals(2.0, store.getCompactionPressure(), EPSILON); 303 } finally { 304 conn.close(); 305 TEST_UTIL.shutdownMiniCluster(); 306 } 307 } 308}