001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.MiniHBaseCluster; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.master.LoadBalancer; 039import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.HStore; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.regionserver.StoreEngine; 045import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.apache.hadoop.hbase.util.Pair; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category(MediumTests.class) 061public class TestFlushWithThroughputController { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestFlushWithThroughputController.class); 066 067 private static final Logger LOG = 068 LoggerFactory.getLogger(TestFlushWithThroughputController.class); 069 private static final double EPSILON = 1E-6; 070 071 private HBaseTestingUtility hbtu; 072 @Rule public TestName testName = new TestName(); 073 private TableName tableName; 074 private final byte[] family = Bytes.toBytes("f"); 075 private final byte[] qualifier = Bytes.toBytes("q"); 076 077 @Before 078 public void setUp() { 079 hbtu = new HBaseTestingUtility(); 080 tableName = TableName.valueOf("Table-" + testName.getMethodName()); 081 hbtu.getConfiguration().set( 082 FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 083 PressureAwareFlushThroughputController.class.getName()); 084 } 085 086 @After 087 public void tearDown() throws Exception { 088 hbtu.shutdownMiniCluster(); 089 } 090 091 private HStore getStoreWithName(TableName tableName) { 092 MiniHBaseCluster cluster = hbtu.getMiniHBaseCluster(); 093 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 094 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 095 HRegionServer hrs = rsts.get(i).getRegionServer(); 096 for (Region region : hrs.getRegions(tableName)) { 097 return ((HRegion) region).getStores().iterator().next(); 098 } 099 } 100 return null; 101 } 102 103 private void setMaxMinThroughputs(long max, long min) { 104 Configuration conf = hbtu.getConfiguration(); 105 conf.setLong( 106 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); 107 conf.setLong( 108 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); 109 } 110 111 /** 112 * Writes Puts to the table and flushes few times. 113 * @return {@link Pair} of (throughput, duration). 114 */ 115 private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { 116 // Internally, throughput is controlled after every cell write, so keep value size less for 117 // better control. 118 final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; 119 Random rand = new Random(); 120 long duration = 0; 121 for (int i = 0; i < NUM_FLUSHES; i++) { 122 // Write about 10M (10 times of throughput rate) per iteration. 123 for (int j = 0; j < NUM_PUTS; j++) { 124 byte[] value = new byte[VALUE_SIZE]; 125 rand.nextBytes(value); 126 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 127 } 128 long startTime = System.nanoTime(); 129 hbtu.getAdmin().flush(tableName); 130 duration += System.nanoTime() - startTime; 131 } 132 HStore store = getStoreWithName(tableName); 133 assertEquals(NUM_FLUSHES, store.getStorefilesCount()); 134 double throughput = (double)store.getStorefilesSize() 135 / TimeUnit.NANOSECONDS.toSeconds(duration); 136 return new Pair<>(throughput, duration); 137 } 138 139 private long testFlushWithThroughputLimit() throws Exception { 140 final long throughputLimit = 1024 * 1024; 141 setMaxMinThroughputs(throughputLimit, throughputLimit); 142 Configuration conf = hbtu.getConfiguration(); 143 conf.setLong( 144 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, 145 throughputLimit); 146 hbtu.startMiniCluster(1); 147 Table table = hbtu.createTable(tableName, family); 148 Pair<Double, Long> result = generateAndFlushData(table); 149 hbtu.deleteTable(tableName); 150 LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); 151 // confirm that the speed limit work properly(not too fast, and also not too slow) 152 // 20% is the max acceptable error rate. 153 assertTrue(result.getFirst() < throughputLimit * 1.2); 154 assertTrue(result.getFirst() > throughputLimit * 0.8); 155 return result.getSecond(); 156 } 157 158 @Test 159 public void testFlushControl() throws Exception { 160 testFlushWithThroughputLimit(); 161 } 162 163 /** 164 * Test the tuning task of {@link PressureAwareFlushThroughputController} 165 */ 166 @Test 167 public void testFlushThroughputTuning() throws Exception { 168 Configuration conf = hbtu.getConfiguration(); 169 setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); 170 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 171 conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 172 3000); 173 hbtu.startMiniCluster(1); 174 Connection conn = ConnectionFactory.createConnection(conf); 175 hbtu.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName) 176 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 177 .build()); 178 hbtu.waitTableAvailable(tableName); 179 HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); 180 double pressure = regionServer.getFlushPressure(); 181 LOG.debug("Flush pressure before flushing: " + pressure); 182 PressureAwareFlushThroughputController throughputController = 183 (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); 184 for (HRegion region : regionServer.getRegions()) { 185 region.flush(true); 186 } 187 // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we 188 // changed to use heapSize instead of dataSize to calculate the flush pressure, and since 189 // heapSize will never be zero, so flush pressure will never be zero either. So we changed the 190 // assertion here. 191 assertTrue(regionServer.getFlushPressure() < pressure); 192 Thread.sleep(5000); 193 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration()); 194 if (tablesOnMaster) { 195 // If no tables on the master, this math is off and I'm not sure what it is supposed to be 196 // when meta is on the regionserver and not on the master. 197 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 198 } 199 Table table = conn.getTable(tableName); 200 Random rand = new Random(); 201 for (int i = 0; i < 10; i++) { 202 for (int j = 0; j < 10; j++) { 203 byte[] value = new byte[256 * 1024]; 204 rand.nextBytes(value); 205 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 206 } 207 } 208 Thread.sleep(5000); 209 double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); 210 assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); 211 212 conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 213 NoLimitThroughputController.class.getName()); 214 regionServer.onConfigurationChange(conf); 215 assertTrue(throughputController.isStopped()); 216 assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); 217 conn.close(); 218 } 219 220 /** 221 * Test the logic for striped store. 222 */ 223 @Test 224 public void testFlushControlForStripedStore() throws Exception { 225 hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, 226 StripeStoreEngine.class.getName()); 227 testFlushWithThroughputLimit(); 228 } 229}