001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.MiniHBaseCluster; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.master.LoadBalancer; 038import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.StoreEngine; 044import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.JVMClusterUtil; 048import org.apache.hadoop.hbase.util.Pair; 049import org.junit.After; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category(LargeTests.class) 060public class TestFlushWithThroughputController { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestFlushWithThroughputController.class); 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestFlushWithThroughputController.class); 068 private static final double EPSILON = 1.3E-6; 069 070 private HBaseTestingUtility hbtu; 071 @Rule 072 public TestName testName = new TestName(); 073 private TableName tableName; 074 private final byte[] family = Bytes.toBytes("f"); 075 private final byte[] qualifier = Bytes.toBytes("q"); 076 077 @Before 078 public void setUp() { 079 hbtu = new HBaseTestingUtility(); 080 tableName = TableName.valueOf("Table-" + testName.getMethodName()); 081 hbtu.getConfiguration().set( 082 FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 083 PressureAwareFlushThroughputController.class.getName()); 084 } 085 086 @After 087 public void tearDown() throws Exception { 088 hbtu.shutdownMiniCluster(); 089 } 090 091 private HStore getStoreWithName(TableName tableName) { 092 MiniHBaseCluster cluster = hbtu.getMiniHBaseCluster(); 093 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 094 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 095 HRegionServer hrs = rsts.get(i).getRegionServer(); 096 for (Region region : hrs.getRegions(tableName)) { 097 return ((HRegion) region).getStores().iterator().next(); 098 } 099 } 100 return null; 101 } 102 103 private void setMaxMinThroughputs(long max, long min) { 104 Configuration conf = hbtu.getConfiguration(); 105 conf.setLong( 106 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); 107 conf.setLong( 108 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); 109 } 110 111 /** 112 * Writes Puts to the table and flushes few times. 113 * @return {@link Pair} of (throughput, duration). 114 */ 115 private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { 116 // Internally, throughput is controlled after every cell write, so keep value size less for 117 // better control. 118 final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; 119 long duration = 0; 120 for (int i = 0; i < NUM_FLUSHES; i++) { 121 // Write about 10M (10 times of throughput rate) per iteration. 122 for (int j = 0; j < NUM_PUTS; j++) { 123 byte[] value = new byte[VALUE_SIZE]; 124 Bytes.random(value); 125 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 126 } 127 long startTime = System.nanoTime(); 128 hbtu.getAdmin().flush(tableName); 129 duration += System.nanoTime() - startTime; 130 } 131 HStore store = getStoreWithName(tableName); 132 assertEquals(NUM_FLUSHES, store.getStorefilesCount()); 133 double throughput = 134 (double) store.getStorefilesSize() / TimeUnit.NANOSECONDS.toSeconds(duration); 135 return new Pair<>(throughput, duration); 136 } 137 138 private long testFlushWithThroughputLimit() throws Exception { 139 final long throughputLimit = 1024 * 1024; 140 setMaxMinThroughputs(throughputLimit, throughputLimit); 141 Configuration conf = hbtu.getConfiguration(); 142 conf.setLong( 143 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, 144 throughputLimit); 145 hbtu.startMiniCluster(1); 146 Table table = hbtu.createTable(tableName, family); 147 Pair<Double, Long> result = generateAndFlushData(table); 148 hbtu.deleteTable(tableName); 149 LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); 150 // confirm that the speed limit work properly(not too fast, and also not too slow) 151 // 20% is the max acceptable error rate. 152 assertTrue(result.getFirst() < throughputLimit * 1.2); 153 assertTrue(result.getFirst() > throughputLimit * 0.8); 154 return result.getSecond(); 155 } 156 157 @Test 158 public void testFlushControl() throws Exception { 159 testFlushWithThroughputLimit(); 160 } 161 162 /** 163 * Test the tuning task of {@link PressureAwareFlushThroughputController} 164 */ 165 @Test 166 public void testFlushThroughputTuning() throws Exception { 167 Configuration conf = hbtu.getConfiguration(); 168 setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); 169 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 170 conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 171 3000); 172 hbtu.startMiniCluster(1); 173 Connection conn = ConnectionFactory.createConnection(conf); 174 hbtu.getAdmin() 175 .createTable(TableDescriptorBuilder.newBuilder(tableName) 176 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 177 .build()); 178 hbtu.waitTableAvailable(tableName); 179 HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); 180 double pressure = regionServer.getFlushPressure(); 181 LOG.debug("Flush pressure before flushing: " + pressure); 182 PressureAwareFlushThroughputController throughputController = 183 (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); 184 for (HRegion region : regionServer.getRegions()) { 185 region.flush(true); 186 } 187 // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we 188 // changed to use heapSize instead of dataSize to calculate the flush pressure, and since 189 // heapSize will never be zero, so flush pressure will never be zero either. So we changed the 190 // assertion here. 191 assertTrue(regionServer.getFlushPressure() < pressure); 192 Thread.sleep(5000); 193 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration()); 194 if (tablesOnMaster) { 195 // If no tables on the master, this math is off and I'm not sure what it is supposed to be 196 // when meta is on the regionserver and not on the master. 197 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 198 } 199 Table table = conn.getTable(tableName); 200 for (int i = 0; i < 10; i++) { 201 for (int j = 0; j < 10; j++) { 202 byte[] value = new byte[256 * 1024]; 203 Bytes.random(value); 204 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 205 } 206 } 207 Thread.sleep(5000); 208 double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); 209 assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); 210 211 conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 212 NoLimitThroughputController.class.getName()); 213 regionServer.onConfigurationChange(conf); 214 assertTrue(throughputController.isStopped()); 215 assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); 216 conn.close(); 217 } 218 219 /** 220 * Test the logic for striped store. 221 */ 222 @Test 223 public void testFlushControlForStripedStore() throws Exception { 224 hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, 225 StripeStoreEngine.class.getName()); 226 testFlushWithThroughputLimit(); 227 } 228}