001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.apache.hadoop.hbase.master.balancer.HeterogeneousCostRulesTestHelper.DEFAULT_RULES_FILE_NAME;
021import static org.apache.hadoop.hbase.master.balancer.HeterogeneousCostRulesTestHelper.createRulesFile;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayDeque;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Map;
032import java.util.Queue;
033import java.util.Random;
034import java.util.TreeMap;
035import java.util.concurrent.ThreadLocalRandom;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.RegionReplicaUtil;
043import org.apache.hadoop.hbase.master.RackManager;
044import org.apache.hadoop.hbase.master.RegionPlan;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@Category({ MasterTests.class, MediumTests.class })
055public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalancerTestBase {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestStochasticLoadBalancerHeterogeneousCost.class);
060
061  private static final Logger LOG =
062    LoggerFactory.getLogger(TestStochasticLoadBalancerHeterogeneousCost.class);
063  private static final double ALLOWED_WINDOW = 1.20;
064  private static final HBaseCommonTestingUtil HTU = new HBaseCommonTestingUtil();
065  private static String RULES_FILE;
066
067  @BeforeClass
068  public static void beforeAllTests() throws IOException {
069    conf = HTU.getConfiguration();
070    conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
071    conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
072    conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
073    conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
074      HeterogeneousRegionCountCostFunction.class.getName());
075    // Need to ensure test dir has been created.
076    assertTrue(FileSystem.get(HTU.getConfiguration()).mkdirs(HTU.getDataTestDir()));
077    RULES_FILE = HTU.getDataTestDir(DEFAULT_RULES_FILE_NAME).toString();
078    conf.set(HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
079      RULES_FILE);
080    loadBalancer = new StochasticLoadTestBalancer();
081    loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
082    loadBalancer.initialize();
083  }
084
085  @Test
086  public void testDefault() throws IOException {
087    final List<String> rules = Collections.emptyList();
088
089    final int numNodes = 2;
090    final int numRegions = 300;
091    final int numRegionsPerServer = 250;
092
093    // Initial state: { rs1:50 , rs0:250 }
094    // Cluster can hold 300/400 regions (75%)
095    // Expected balanced Cluster: { rs0:150 , rs1:150 }
096    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
097  }
098
099  @Test
100  public void testOneGroup() throws IOException {
101    final List<String> rules = Collections.singletonList("rs.* 100");
102
103    final int numNodes = 4;
104    final int numRegions = 300;
105    final int numRegionsPerServer = 30;
106
107    // Initial state: { rs0:30 , rs1:30 , rs2:30 , rs3:210 }.
108    // The cluster can hold 300/400 regions (75%)
109    // Expected balanced Cluster: { rs0:75 , rs1:75 , rs2:75 , rs3:75 }
110    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
111  }
112
113  @Test
114  public void testTwoGroups() throws IOException {
115    final List<String> rules = Arrays.asList("rs[0-4] 200", "rs[5-9] 50");
116
117    final int numNodes = 10;
118    final int numRegions = 500;
119    final int numRegionsPerServer = 50;
120
121    // Initial state: { rs0:50 , rs1:50 , rs2:50 , rs3:50 , rs4:50 , rs5:50 , rs6:50 , rs7:50 ,
122    // rs8:50 , rs9:50 }
123    // the cluster can hold 500/1250 regions (40%)
124    // Expected balanced Cluster: { rs5:20 , rs6:20 , rs7:20 , rs8:20 , rs9:20 , rs0:80 , rs1:80 ,
125    // rs2:80 , rs3:80 , rs4:80 }
126    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
127  }
128
129  @Test
130  public void testFourGroups() throws IOException {
131    final List<String> rules = Arrays.asList("rs[1-3] 200", "rs[4-7] 250", "rs[8-9] 100");
132
133    final int numNodes = 10;
134    final int numRegions = 800;
135    final int numRegionsPerServer = 80;
136
137    // Initial state: { rs0:80 , rs1:80 , rs2:80 , rs3:80 , rs4:80 , rs5:80 , rs6:80 , rs7:80 ,
138    // rs8:80 , rs9:80 }
139    // Cluster can hold 800/2000 regions (40%)
140    // Expected balanced Cluster: { rs8:40 , rs9:40 , rs2:80 , rs3:80 , rs1:82 , rs0:94 , rs4:96 ,
141    // rs5:96 , rs6:96 , rs7:96 }
142    this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
143  }
144
145  @Test
146  public void testOverloaded() throws IOException {
147    final int numNodes = 2;
148    final int numRegions = 120;
149    final int numRegionsPerServer = 60;
150
151    createRulesFile(RULES_FILE);
152    final Map<ServerName, List<RegionInfo>> serverMap =
153      this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
154    final List<RegionPlan> plans =
155      loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
156    // As we disabled all the other cost functions, balancing only according to
157    // the heterogeneous cost function should return nothing.
158    assertNull(plans);
159  }
160
161  private void testHeterogeneousWithCluster(final int numNodes, final int numRegions,
162    final int numRegionsPerServer, final List<String> rules) throws IOException {
163
164    createRulesFile(RULES_FILE, rules);
165    final Map<ServerName, List<RegionInfo>> serverMap =
166      this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
167    this.testWithCluster(serverMap, null, true, false);
168  }
169
170  @Override
171  protected void testWithCluster(final Map<ServerName, List<RegionInfo>> serverMap,
172    final RackManager rackManager, final boolean assertFullyBalanced,
173    final boolean assertFullyBalancedForReplicas) {
174    final List<ServerAndLoad> list = this.convertToList(serverMap);
175    LOG.info("Mock Cluster : " + this.printMock(list) + " " + this.printStats(list));
176
177    loadBalancer.setRackManager(rackManager);
178
179    // Run the balancer.
180    final List<RegionPlan> plans =
181      loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
182    assertNotNull(plans);
183
184    // Check to see that this actually got to a stable place.
185    if (assertFullyBalanced || assertFullyBalancedForReplicas) {
186      // Apply the plan to the mock cluster.
187      final List<ServerAndLoad> balancedCluster = this.reconcile(list, plans, serverMap);
188
189      // Print out the cluster loads to make debugging easier.
190      LOG.info("Mock Balanced cluster : " + this.printMock(balancedCluster));
191
192      if (assertFullyBalanced) {
193        final List<RegionPlan> secondPlans =
194          loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
195        assertNull(secondPlans);
196
197        // create external cost function to retrieve limit
198        // for each RS
199        final HeterogeneousRegionCountCostFunction cf =
200          new HeterogeneousRegionCountCostFunction(conf);
201        assertNotNull(cf);
202        BalancerClusterState cluster = new BalancerClusterState(serverMap, null, null, null);
203        cf.prepare(cluster);
204
205        // checking that we all hosts have a number of regions below their limit
206        for (final ServerAndLoad serverAndLoad : balancedCluster) {
207          final ServerName sn = serverAndLoad.getServerName();
208          final int numberRegions = serverAndLoad.getLoad();
209          final int limit = cf.findLimitForRS(sn);
210
211          double usage = (double) numberRegions / (double) limit;
212          LOG.debug(
213            sn.getHostname() + ":" + numberRegions + "/" + limit + "(" + (usage * 100) + "%)");
214
215          // as the balancer is stochastic, we cannot check exactly the result of the balancing,
216          // hence the allowedWindow parameter
217          assertTrue("Host " + sn.getHostname() + " should be below "
218            + cf.overallUsage * ALLOWED_WINDOW * 100 + "%; " + cf.overallUsage + ", " + usage + ", "
219            + numberRegions + ", " + limit, usage <= cf.overallUsage * ALLOWED_WINDOW);
220        }
221      }
222
223      if (assertFullyBalancedForReplicas) {
224        this.assertRegionReplicaPlacement(serverMap, rackManager);
225      }
226    }
227  }
228
229  @Override
230  protected Map<ServerName, List<RegionInfo>> createServerMap(int numNodes, int numRegions,
231    int numRegionsPerServer, int replication, int numTables) {
232    // construct a cluster of numNodes, having a total of numRegions. Each RS will hold
233    // numRegionsPerServer many regions except for the last one, which will host all the
234    // remaining regions
235    int[] cluster = new int[numNodes];
236    for (int i = 0; i < numNodes; i++) {
237      cluster[i] = numRegionsPerServer;
238    }
239    cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
240    Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(cluster, numTables);
241    if (replication > 0) {
242      // replicate the regions to the same servers
243      for (List<RegionInfo> regions : clusterState.values()) {
244        int length = regions.size();
245        for (int i = 0; i < length; i++) {
246          for (int r = 1; r < replication; r++) {
247            regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
248          }
249        }
250      }
251    }
252
253    return clusterState;
254  }
255
256  @Override
257  protected TreeMap<ServerName, List<RegionInfo>> mockClusterServers(int[] mockCluster,
258    int numTables) {
259    int numServers = mockCluster.length;
260    TreeMap<ServerName, List<RegionInfo>> servers = new TreeMap<>();
261    for (int i = 0; i < numServers; i++) {
262      int numRegions = mockCluster[i];
263      ServerAndLoad sal = createServer("rs" + i);
264      List<RegionInfo> regions = randomRegions(numRegions, numTables);
265      servers.put(sal.getServerName(), regions);
266    }
267    return servers;
268  }
269
270  private Queue<ServerName> serverQueue = new ArrayDeque<>();
271
272  private ServerAndLoad createServer(final String host) {
273    if (!this.serverQueue.isEmpty()) {
274      ServerName sn = this.serverQueue.poll();
275      return new ServerAndLoad(sn, 0);
276    }
277    Random rand = ThreadLocalRandom.current();
278    int port = rand.nextInt(60000);
279    long startCode = rand.nextLong();
280    ServerName sn = ServerName.valueOf(host, port, startCode);
281    return new ServerAndLoad(sn, 0);
282  }
283
284  static class FairRandomCandidateGenerator extends RandomCandidateGenerator {
285
286    @Override
287    public BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer,
288      int otherServer) {
289      if (thisServer < 0 || otherServer < 0) {
290        return BalanceAction.NULL_ACTION;
291      }
292
293      int thisRegion = pickRandomRegion(cluster, thisServer, 0.5);
294      int otherRegion = pickRandomRegion(cluster, otherServer, 0.5);
295
296      return getAction(thisServer, thisRegion, otherServer, otherRegion);
297    }
298
299    @Override
300    BalanceAction generate(BalancerClusterState cluster) {
301      return super.generate(cluster);
302    }
303  }
304
305  static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
306    private FairRandomCandidateGenerator fairRandomCandidateGenerator =
307      new FairRandomCandidateGenerator();
308
309    @Override
310    protected CandidateGenerator getRandomGenerator() {
311      return fairRandomCandidateGenerator;
312    }
313  }
314}