001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.regionserver.StoreEngine; 043import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.JVMClusterUtil; 047import org.apache.hadoop.hbase.util.Pair; 048import org.junit.jupiter.api.AfterEach; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.junit.jupiter.api.TestInfo; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056@Tag(LargeTests.TAG) 057public class TestFlushWithThroughputController { 058 059 private static final Logger LOG = 060 LoggerFactory.getLogger(TestFlushWithThroughputController.class); 061 private static final double EPSILON = 1.3E-6; 062 063 private HBaseTestingUtil hbtu; 064 private String testName; 065 private TableName tableName; 066 private final byte[] family = Bytes.toBytes("f"); 067 private final byte[] qualifier = Bytes.toBytes("q"); 068 069 @BeforeEach 070 public void setUp(TestInfo testInfo) { 071 testName = testInfo.getTestMethod().get().getName(); 072 hbtu = new HBaseTestingUtil(); 073 tableName = TableName.valueOf("Table-" + testName); 074 hbtu.getConfiguration().set( 075 FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 076 PressureAwareFlushThroughputController.class.getName()); 077 } 078 079 @AfterEach 080 public void tearDown() throws Exception { 081 hbtu.shutdownMiniCluster(); 082 } 083 084 private HStore getStoreWithName(TableName tableName) { 085 SingleProcessHBaseCluster cluster = hbtu.getMiniHBaseCluster(); 086 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 087 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 088 HRegionServer hrs = rsts.get(i).getRegionServer(); 089 for (Region region : hrs.getRegions(tableName)) { 090 return ((HRegion) region).getStores().iterator().next(); 091 } 092 } 093 return null; 094 } 095 096 private void setMaxMinThroughputs(long max, long min) { 097 Configuration conf = hbtu.getConfiguration(); 098 conf.setLong( 099 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); 100 conf.setLong( 101 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); 102 } 103 104 /** 105 * Writes Puts to the table and flushes few times. 106 * @return {@link Pair} of (throughput, duration). 107 */ 108 private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { 109 // Internally, throughput is controlled after every cell write, so keep value size less for 110 // better control. 111 final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; 112 long duration = 0; 113 for (int i = 0; i < NUM_FLUSHES; i++) { 114 // Write about 10M (10 times of throughput rate) per iteration. 115 for (int j = 0; j < NUM_PUTS; j++) { 116 byte[] value = new byte[VALUE_SIZE]; 117 Bytes.random(value); 118 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 119 } 120 long startTime = System.nanoTime(); 121 hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { 122 try { 123 r.flush(true); 124 } catch (IOException e) { 125 LOG.error("Failed flush region {}", r, e); 126 fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); 127 } 128 }); 129 duration += System.nanoTime() - startTime; 130 } 131 HStore store = getStoreWithName(tableName); 132 assertEquals(NUM_FLUSHES, store.getStorefilesCount()); 133 double throughput = 134 (double) store.getStorefilesSize() / TimeUnit.NANOSECONDS.toSeconds(duration); 135 return new Pair<>(throughput, duration); 136 } 137 138 private long testFlushWithThroughputLimit() throws Exception { 139 final long throughputLimit = 1024 * 1024; 140 setMaxMinThroughputs(throughputLimit, throughputLimit); 141 Configuration conf = hbtu.getConfiguration(); 142 conf.setLong( 143 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, 144 throughputLimit); 145 hbtu.startMiniCluster(1); 146 Table table = hbtu.createTable(tableName, family); 147 Pair<Double, Long> result = generateAndFlushData(table); 148 hbtu.deleteTable(tableName); 149 LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); 150 // confirm that the speed limit work properly(not too fast, and also not too slow) 151 // 20% is the max acceptable error rate. 152 assertTrue(result.getFirst() < throughputLimit * 1.2); 153 assertTrue(result.getFirst() > throughputLimit * 0.8); 154 return result.getSecond(); 155 } 156 157 @Test 158 public void testFlushControl() throws Exception { 159 testFlushWithThroughputLimit(); 160 } 161 162 /** 163 * Test the tuning task of {@link PressureAwareFlushThroughputController} 164 */ 165 @Test 166 public void testFlushThroughputTuning() throws Exception { 167 Configuration conf = hbtu.getConfiguration(); 168 setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); 169 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 170 conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 171 3000); 172 hbtu.startMiniCluster(1); 173 Connection conn = ConnectionFactory.createConnection(conf); 174 hbtu.getAdmin() 175 .createTable(TableDescriptorBuilder.newBuilder(tableName) 176 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 177 .build()); 178 hbtu.waitTableAvailable(tableName); 179 HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); 180 double pressure = regionServer.getFlushPressure(); 181 LOG.debug("Flush pressure before flushing: " + pressure); 182 PressureAwareFlushThroughputController throughputController = 183 (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); 184 for (HRegion region : regionServer.getRegions()) { 185 region.flush(true); 186 } 187 // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we 188 // changed to use heapSize instead of dataSize to calculate the flush pressure, and since 189 // heapSize will never be zero, so flush pressure will never be zero either. So we changed the 190 // assertion here. 191 assertTrue(regionServer.getFlushPressure() < pressure); 192 Thread.sleep(5000); 193 Table table = conn.getTable(tableName); 194 for (int i = 0; i < 10; i++) { 195 for (int j = 0; j < 10; j++) { 196 byte[] value = new byte[256 * 1024]; 197 Bytes.random(value); 198 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 199 } 200 } 201 Thread.sleep(5000); 202 double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); 203 assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); 204 205 conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 206 NoLimitThroughputController.class.getName()); 207 regionServer.onConfigurationChange(conf); 208 assertTrue(throughputController.isStopped()); 209 assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); 210 conn.close(); 211 } 212 213 /** 214 * Test the logic for striped store. 215 */ 216 @Test 217 public void testFlushControlForStripedStore() throws Exception { 218 hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, 219 StripeStoreEngine.class.getName()); 220 testFlushWithThroughputLimit(); 221 } 222}