001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static junit.framework.TestCase.assertNotNull;
021import static junit.framework.TestCase.assertTrue;
022import static org.junit.Assert.assertNull;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.Map;
032import java.util.Queue;
033import java.util.Random;
034import java.util.TreeMap;
035import java.util.concurrent.ThreadLocalRandom;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.RegionReplicaUtil;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.RackManager;
045import org.apache.hadoop.hbase.master.RegionPlan;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ MasterTests.class, MediumTests.class })
056public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBase {
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestStochasticLoadBalancerHeterogeneousCost.class);
060
061  private static final Logger LOG =
062    LoggerFactory.getLogger(TestStochasticLoadBalancerHeterogeneousCost.class);
063  private static final double ALLOWED_WINDOW = 1.20;
064  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
065  private static String RULES_FILE;
066
067  @BeforeClass
068  public static void beforeAllTests() throws IOException {
069    BalancerTestBase.conf = HTU.getConfiguration();
070    BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
071    BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
072    BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
073    BalancerTestBase.conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
074      HeterogeneousRegionCountCostFunction.class.getName());
075    // Need to ensure test dir has been created.
076    assertTrue(FileSystem.get(HTU.getConfiguration()).mkdirs(HTU.getDataTestDir()));
077    RULES_FILE =
078      HTU.getDataTestDir(TestStochasticLoadBalancerHeterogeneousCostRules.DEFAULT_RULES_FILE_NAME)
079        .toString();
080    BalancerTestBase.conf.set(
081      HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
082      RULES_FILE);
083    loadBalancer = new StochasticLoadTestBalancer();
084    MasterServices services = mock(MasterServices.class);
085    when(services.getConfiguration()).thenReturn(conf);
086    BalancerTestBase.loadBalancer.setMasterServices(services);
087    loadBalancer.initialize();
088    loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
089  }
090
091  @Test
092  public void testDefault() throws IOException {
093    final List<String> rules = Collections.emptyList();
094
095    final int numNodes = 2;
096    final int numRegions = 300;
097    final int numRegionsPerServer = 250;
098
099    // Initial state: { rs1:50 , rs0:250 }
100    // Cluster can hold 300/400 regions (75%)
101    // Expected balanced Cluster: { rs0:150 , rs1:150 }
102    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
103  }
104
105  @Test
106  public void testOneGroup() throws IOException {
107    final List<String> rules = Collections.singletonList("rs.* 100");
108
109    final int numNodes = 4;
110    final int numRegions = 300;
111    final int numRegionsPerServer = 30;
112
113    // Initial state: { rs0:30 , rs1:30 , rs2:30 , rs3:210 }.
114    // The cluster can hold 300/400 regions (75%)
115    // Expected balanced Cluster: { rs0:75 , rs1:75 , rs2:75 , rs3:75 }
116    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
117  }
118
119  @Test
120  public void testTwoGroups() throws IOException {
121    final List<String> rules = Arrays.asList("rs[0-4] 200", "rs[5-9] 50");
122
123    final int numNodes = 10;
124    final int numRegions = 500;
125    final int numRegionsPerServer = 50;
126
127    // Initial state: { rs0:50 , rs1:50 , rs2:50 , rs3:50 , rs4:50 , rs5:50 , rs6:50 , rs7:50 ,
128    // rs8:50 , rs9:50 }
129    // the cluster can hold 500/1250 regions (40%)
130    // Expected balanced Cluster: { rs5:20 , rs6:20 , rs7:20 , rs8:20 , rs9:20 , rs0:80 , rs1:80 ,
131    // rs2:80 , rs3:80 , rs4:80 }
132    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
133  }
134
135  @Test
136  public void testFourGroups() throws IOException {
137    final List<String> rules = Arrays.asList("rs[1-3] 200", "rs[4-7] 250", "rs[8-9] 100");
138
139    final int numNodes = 10;
140    final int numRegions = 800;
141    final int numRegionsPerServer = 80;
142
143    // Initial state: { rs0:80 , rs1:80 , rs2:80 , rs3:80 , rs4:80 , rs5:80 , rs6:80 , rs7:80 ,
144    // rs8:80 , rs9:80 }
145    // Cluster can hold 800/2000 regions (40%)
146    // Expected balanced Cluster: { rs8:40 , rs9:40 , rs2:80 , rs3:80 , rs1:82 , rs0:94 , rs4:96 ,
147    // rs5:96 , rs6:96 , rs7:96 }
148    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
149  }
150
151  @Test
152  public void testOverloaded() throws IOException {
153    final List<String> rules = Collections.singletonList("rs[0-1] 50");
154
155    final int numNodes = 2;
156    final int numRegions = 120;
157    final int numRegionsPerServer = 60;
158
159    TestStochasticLoadBalancerHeterogeneousCostRules.createRulesFile(RULES_FILE);
160    final Map<ServerName, List<RegionInfo>> serverMap =
161      this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
162    final List<RegionPlan> plans =
163      BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
164    // As we disabled all the other cost functions, balancing only according to
165    // the heterogeneous cost function should return nothing.
166    assertNull(plans);
167  }
168
169  private void testHeterogeneousWithCluster(final int numNodes, final int numRegions,
170    final int numRegionsPerServer, final List<String> rules) throws IOException {
171
172    TestStochasticLoadBalancerHeterogeneousCostRules.createRulesFile(RULES_FILE, rules);
173    final Map<ServerName, List<RegionInfo>> serverMap =
174      this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
175    this.testWithCluster(serverMap, null, true, false);
176  }
177
178  protected void testWithCluster(final Map<ServerName, List<RegionInfo>> serverMap,
179    final RackManager rackManager, final boolean assertFullyBalanced,
180    final boolean assertFullyBalancedForReplicas) {
181    final List<ServerAndLoad> list = this.convertToList(serverMap);
182    LOG.info("Mock Cluster : " + this.printMock(list) + " " + this.printStats(list));
183
184    BalancerTestBase.loadBalancer.setRackManager(rackManager);
185
186    // Run the balancer.
187    final List<RegionPlan> plans =
188      BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
189    assertNotNull(plans);
190
191    // Check to see that this actually got to a stable place.
192    if (assertFullyBalanced || assertFullyBalancedForReplicas) {
193      // Apply the plan to the mock cluster.
194      final List<ServerAndLoad> balancedCluster = this.reconcile(list, plans, serverMap);
195
196      // Print out the cluster loads to make debugging easier.
197      LOG.info("Mock Balanced cluster : " + this.printMock(balancedCluster));
198
199      if (assertFullyBalanced) {
200        final List<RegionPlan> secondPlans =
201          BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
202        assertNull(secondPlans);
203
204        // create external cost function to retrieve limit
205        // for each RS
206        final HeterogeneousRegionCountCostFunction cf =
207          new HeterogeneousRegionCountCostFunction(conf);
208        assertNotNull(cf);
209        BalancerClusterState cluster = new BalancerClusterState(serverMap, null, null, null);
210        cf.prepare(cluster);
211
212        // checking that we all hosts have a number of regions below their limit
213        for (final ServerAndLoad serverAndLoad : balancedCluster) {
214          final ServerName sn = serverAndLoad.getServerName();
215          final int numberRegions = serverAndLoad.getLoad();
216          final int limit = cf.findLimitForRS(sn);
217
218          double usage = (double) numberRegions / (double) limit;
219          LOG.debug(
220            sn.getHostname() + ":" + numberRegions + "/" + limit + "(" + (usage * 100) + "%)");
221
222          // as the balancer is stochastic, we cannot check exactly the result of the balancing,
223          // hence the allowedWindow parameter
224          assertTrue("Host " + sn.getHostname() + " should be below "
225            + cf.overallUsage * ALLOWED_WINDOW * 100 + "%; " + cf.overallUsage + ", " + usage + ", "
226            + numberRegions + ", " + limit, usage <= cf.overallUsage * ALLOWED_WINDOW);
227        }
228      }
229
230      if (assertFullyBalancedForReplicas) {
231        this.assertRegionReplicaPlacement(serverMap, rackManager);
232      }
233    }
234  }
235
236  @Override
237  protected Map<ServerName, List<RegionInfo>> createServerMap(int numNodes, int numRegions,
238    int numRegionsPerServer, int replication, int numTables) {
239    // construct a cluster of numNodes, having a total of numRegions. Each RS will hold
240    // numRegionsPerServer many regions except for the last one, which will host all the
241    // remaining regions
242    int[] cluster = new int[numNodes];
243    for (int i = 0; i < numNodes; i++) {
244      cluster[i] = numRegionsPerServer;
245    }
246    cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
247    Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(cluster, numTables);
248    if (replication > 0) {
249      // replicate the regions to the same servers
250      for (List<RegionInfo> regions : clusterState.values()) {
251        int length = regions.size();
252        for (int i = 0; i < length; i++) {
253          for (int r = 1; r < replication; r++) {
254            regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
255          }
256        }
257      }
258    }
259
260    return clusterState;
261  }
262
263  @Override
264  protected TreeMap<ServerName, List<RegionInfo>> mockClusterServers(int[] mockCluster,
265    int numTables) {
266    int numServers = mockCluster.length;
267    TreeMap<ServerName, List<RegionInfo>> servers = new TreeMap<>();
268    for (int i = 0; i < numServers; i++) {
269      int numRegions = mockCluster[i];
270      ServerAndLoad sal = createServer("rs" + i);
271      List<RegionInfo> regions = randomRegions(numRegions, numTables);
272      servers.put(sal.getServerName(), regions);
273    }
274    return servers;
275  }
276
277  private Queue<ServerName> serverQueue = new LinkedList<>();
278
279  private ServerAndLoad createServer(final String host) {
280    if (!this.serverQueue.isEmpty()) {
281      ServerName sn = this.serverQueue.poll();
282      return new ServerAndLoad(sn, 0);
283    }
284    Random rand = ThreadLocalRandom.current();
285    int port = rand.nextInt(60000);
286    long startCode = rand.nextLong();
287    ServerName sn = ServerName.valueOf(host, port, startCode);
288    return new ServerAndLoad(sn, 0);
289  }
290
291  static class FairRandomCandidateGenerator extends RandomCandidateGenerator {
292
293    @Override
294    public BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer,
295      int otherServer) {
296      if (thisServer < 0 || otherServer < 0) {
297        return BalanceAction.NULL_ACTION;
298      }
299
300      int thisRegion = pickRandomRegion(cluster, thisServer, 0.5);
301      int otherRegion = pickRandomRegion(cluster, otherServer, 0.5);
302
303      return getAction(thisServer, thisRegion, otherServer, otherRegion);
304    }
305
306    @Override
307    BalanceAction generate(BalancerClusterState cluster) {
308      return super.generate(cluster);
309    }
310  }
311
312  static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
313    private FairRandomCandidateGenerator fairRandomCandidateGenerator =
314      new FairRandomCandidateGenerator();
315
316    @Override
317    protected CandidateGenerator getRandomGenerator() {
318      return fairRandomCandidateGenerator;
319    }
320  }
321}