001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.lang.reflect.Constructor; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.Deque; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.StringJoiner; 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.TimeUnit; 033import java.util.function.Supplier; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterMetrics; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.RegionMetrics; 039import org.apache.hadoop.hbase.ServerMetrics; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.BalancerDecision; 043import org.apache.hadoop.hbase.client.BalancerRejection; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.master.RackManager; 046import org.apache.hadoop.hbase.master.RegionPlan; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.hadoop.hbase.util.ReflectionUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 055 056/** 057 * <p> 058 * This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and 059 * mutate the cluster to Cprime. If F(Cprime) < F(C) then the new cluster state becomes the plan. 060 * It includes costs functions to compute the cost of: 061 * </p> 062 * <ul> 063 * <li>Region Load</li> 064 * <li>Table Load</li> 065 * <li>Data Locality</li> 066 * <li>Memstore Sizes</li> 067 * <li>Storefile Sizes</li> 068 * </ul> 069 * <p> 070 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best 071 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are 072 * scaled by their respective multipliers: 073 * </p> 074 * <ul> 075 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 076 * <li>hbase.master.balancer.stochastic.moveCost</li> 077 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 078 * <li>hbase.master.balancer.stochastic.localityCost</li> 079 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 080 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 081 * </ul> 082 * <p> 083 * You can also add custom Cost function by setting the the following configuration value: 084 * </p> 085 * <ul> 086 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 087 * </ul> 088 * <p> 089 * All custom Cost Functions needs to extends {@link CostFunction} 090 * </p> 091 * <p> 092 * In addition to the above configurations, the balancer can be tuned by the following configuration 093 * values: 094 * </p> 095 * <ul> 096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions 097 * that can be moved in a single invocation of this balancer.</li> 098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 099 * regions is multiplied to try and get the number of times the balancer will mutate all 100 * servers.</li> 101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the 102 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and 103 * the above computation.</li> 104 * </ul> 105 * <p> 106 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the 107 * balancer gets the full picture of all loads on the cluster. 108 * </p> 109 */ 110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 111public class StochasticLoadBalancer extends BaseLoadBalancer { 112 113 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 114 115 protected static final String STEPS_PER_REGION_KEY = 116 "hbase.master.balancer.stochastic.stepsPerRegion"; 117 protected static final int DEFAULT_STEPS_PER_REGION = 800; 118 protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; 119 protected static final int DEFAULT_MAX_STEPS = 1000000; 120 protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps"; 121 protected static final boolean DEFAULT_RUN_MAX_STEPS = false; 122 protected static final String MAX_RUNNING_TIME_KEY = 123 "hbase.master.balancer.stochastic.maxRunningTime"; 124 protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds. 125 protected static final String KEEP_REGION_LOADS = 126 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 127 protected static final int DEFAULT_KEEP_REGION_LOADS = 15; 128 private static final String TABLE_FUNCTION_SEP = "_"; 129 protected static final String MIN_COST_NEED_BALANCE_KEY = 130 "hbase.master.balancer.stochastic.minCostNeedBalance"; 131 protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f; 132 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 133 "hbase.master.balancer.stochastic.additionalCostFunctions"; 134 public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; 135 136 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 137 138 // values are defaults 139 private int maxSteps = DEFAULT_MAX_STEPS; 140 private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; 141 private int stepsPerRegion = DEFAULT_STEPS_PER_REGION; 142 private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; 143 private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; 144 private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; 145 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>(); 146 147 protected List<CostFunction> costFunctions; // FindBugs: Wants this protected; 148 // IS2_INCONSISTENT_SYNC 149 // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost 150 private float sumMultiplier; 151 // to save and report costs to JMX 152 private double curOverallCost = 0d; 153 private double[] tempFunctionCosts; 154 private double[] curFunctionCosts; 155 156 // Keep locality based picker and cost function to alert them 157 // when new services are offered 158 private LocalityBasedCandidateGenerator localityCandidateGenerator; 159 private ServerLocalityCostFunction localityCost; 160 private RackLocalityCostFunction rackLocalityCost; 161 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 162 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 163 164 private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators = 165 new HashMap<>(); 166 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators; 167 protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses = 168 Suppliers.memoizeWithExpiration(() -> { 169 List<Class<? extends CandidateGenerator>> shuffled = 170 new ArrayList<>(candidateGenerators.keySet()); 171 Collections.shuffle(shuffled); 172 return shuffled; 173 }, 5, TimeUnit.SECONDS); 174 175 private final BalancerConditionals balancerConditionals = BalancerConditionals.create(); 176 177 /** 178 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 179 * default MetricsBalancer 180 */ 181 public StochasticLoadBalancer() { 182 super(new MetricsStochasticBalancer()); 183 } 184 185 @RestrictedApi(explanation = "Should only be called in tests", link = "", 186 allowedOnPath = ".*/src/test/.*") 187 public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) { 188 super(metricsStochasticBalancer); 189 } 190 191 private static CostFunction createCostFunction(Class<? extends CostFunction> clazz, 192 Configuration conf) { 193 try { 194 Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class); 195 return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); 196 } catch (NoSuchMethodException e) { 197 // will try construct with no parameter 198 } 199 return ReflectionUtils.newInstance(clazz); 200 } 201 202 private void loadCustomCostFunctions(Configuration conf) { 203 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 204 205 if (null == functionsNames) { 206 return; 207 } 208 for (String className : functionsNames) { 209 Class<? extends CostFunction> clazz; 210 try { 211 clazz = Class.forName(className).asSubclass(CostFunction.class); 212 } catch (ClassNotFoundException e) { 213 LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); 214 continue; 215 } 216 CostFunction func = createCostFunction(clazz, conf); 217 LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); 218 costFunctions.add(func); 219 } 220 } 221 222 @RestrictedApi(explanation = "Should only be called in tests", link = "", 223 allowedOnPath = ".*/src/test/.*") 224 Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() { 225 return this.candidateGenerators; 226 } 227 228 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> 229 createCandidateGenerators(Configuration conf) { 230 balancerConditionals.setConf(conf); 231 Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators; 232 if (balancerConditionals.isReplicaDistributionEnabled()) { 233 candidateGenerators = new HashMap<>(3); 234 candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); 235 candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); 236 candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); 237 } else { 238 candidateGenerators = new HashMap<>(5); 239 candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); 240 candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); 241 candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); 242 candidateGenerators.put(RegionReplicaCandidateGenerator.class, 243 new RegionReplicaCandidateGenerator()); 244 candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, 245 new RegionReplicaRackCandidateGenerator()); 246 } 247 return candidateGenerators; 248 } 249 250 protected List<CostFunction> createCostFunctions(Configuration conf) { 251 List<CostFunction> costFunctions = new ArrayList<>(); 252 addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); 253 addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); 254 addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); 255 addCostFunction(costFunctions, localityCost); 256 addCostFunction(costFunctions, rackLocalityCost); 257 addCostFunction(costFunctions, new TableSkewCostFunction(conf)); 258 addCostFunction(costFunctions, new StoreFileTableSkewCostFunction(conf)); 259 addCostFunction(costFunctions, regionReplicaHostCostFunction); 260 addCostFunction(costFunctions, regionReplicaRackCostFunction); 261 addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); 262 addCostFunction(costFunctions, new CPRequestCostFunction(conf)); 263 addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); 264 addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); 265 addCostFunction(costFunctions, new StoreFileCostFunction(conf)); 266 return costFunctions; 267 } 268 269 @Override 270 protected void loadConf(Configuration conf) { 271 super.loadConf(conf); 272 maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS); 273 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION); 274 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME); 275 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS); 276 277 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS); 278 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE); 279 localityCandidateGenerator = new LocalityBasedCandidateGenerator(); 280 localityCost = new ServerLocalityCostFunction(conf); 281 rackLocalityCost = new RackLocalityCostFunction(conf); 282 283 balancerConditionals.setConf(conf); 284 this.candidateGenerators = createCandidateGenerators(conf); 285 286 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 287 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 288 this.costFunctions = createCostFunctions(conf); 289 loadCustomCostFunctions(conf); 290 291 curFunctionCosts = new double[costFunctions.size()]; 292 tempFunctionCosts = new double[costFunctions.size()]; 293 294 LOG.info( 295 "Loaded config: maxSteps={}, runMaxSteps={}, stepsPerRegion={}, maxRunningTime={}, " 296 + "isByTable={}, CostFunctions={}, sum of multiplier of cost functions = {} etc.", 297 maxSteps, runMaxSteps, stepsPerRegion, maxRunningTime, isByTable, 298 Arrays.toString(getCostFunctionNames()), sumMultiplier); 299 } 300 301 @Override 302 public void updateClusterMetrics(ClusterMetrics st) { 303 super.updateClusterMetrics(st); 304 updateRegionLoad(); 305 306 // update metrics size 307 try { 308 // by-table or ensemble mode 309 int tablesCount = isByTable ? provider.getNumberOfTables() : 1; 310 int functionsCount = getCostFunctionNames().length; 311 312 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 313 } catch (Exception e) { 314 LOG.error("failed to get the size of all tables", e); 315 } 316 } 317 318 private void updateBalancerTableLoadInfo(TableName tableName, 319 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 320 RegionHDFSBlockLocationFinder finder = null; 321 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 322 finder = this.regionFinder; 323 } 324 BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager); 325 326 initCosts(cluster); 327 curOverallCost = computeCost(cluster, Double.MAX_VALUE); 328 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 329 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 330 } 331 332 protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState, 333 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder, 334 RackManager rackManager) { 335 return new BalancerClusterState(clusterState, loads, finder, rackManager); 336 } 337 338 @Override 339 public void 340 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 341 if (isByTable) { 342 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 343 updateBalancerTableLoadInfo(tableName, loadOfOneTable); 344 }); 345 } else { 346 updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, 347 toEnsumbleTableLoad(loadOfAllTable)); 348 } 349 } 350 351 /** 352 * Update the number of metrics that are reported to JMX 353 */ 354 @RestrictedApi(explanation = "Should only be called in tests", link = "", 355 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 356 void updateMetricsSize(int size) { 357 if (metricsBalancer instanceof MetricsStochasticBalancer) { 358 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 359 } 360 } 361 362 private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) { 363 if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { 364 // This check is unnecessary without replicas, or with conditional replica distribution 365 // The balancer will auto-run if conditional replica distribution candidates are available 366 return false; 367 } 368 if (c.numHosts >= c.maxReplicas) { 369 regionReplicaHostCostFunction.prepare(c); 370 double hostCost = Math.abs(regionReplicaHostCostFunction.cost()); 371 boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost); 372 if (colocatedAtHost) { 373 return true; 374 } 375 LOG.trace("No host colocation detected with host cost={}", hostCost); 376 } 377 return false; 378 } 379 380 private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) { 381 if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { 382 // This check is unnecessary without replicas, or with conditional replica distribution 383 // The balancer will auto-run if conditional replica distribution candidates are available 384 return false; 385 } 386 if (c.numRacks >= c.maxReplicas) { 387 regionReplicaRackCostFunction.prepare(c); 388 double rackCost = Math.abs(regionReplicaRackCostFunction.cost()); 389 boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost); 390 if (colocatedAtRack) { 391 return true; 392 } 393 LOG.trace("No rack colocation detected with rack cost={}", rackCost); 394 } else { 395 LOG.trace("Rack colocation is inevitable with fewer racks than replicas, " 396 + "so we won't bother checking"); 397 } 398 return false; 399 } 400 401 private String getBalanceReason(double total, double sumMultiplier) { 402 if (total <= 0) { 403 return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0"; 404 } else if (sumMultiplier <= 0) { 405 return "sumMultiplier = " + sumMultiplier + " <= 0"; 406 } else if ((total / sumMultiplier) < minCostNeedBalance) { 407 return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " 408 + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")"; 409 } else { 410 return ""; 411 } 412 } 413 414 @RestrictedApi(explanation = "Should only be called in tests", link = "", 415 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 416 boolean needsBalance(TableName tableName, BalancerClusterState cluster) { 417 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 418 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 419 LOG.info( 420 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 421 sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers() 422 + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null); 423 return false; 424 } 425 426 if (areSomeRegionReplicasColocatedOnHost(cluster)) { 427 LOG.info("Running balancer because at least one server hosts replicas of the same region." 428 + " function cost={}", functionCost()); 429 return true; 430 } 431 432 if (areSomeRegionReplicasColocatedOnRack(cluster)) { 433 LOG.info("Running balancer because at least one rack hosts replicas of the same region." 434 + " function cost={}", functionCost()); 435 return true; 436 } 437 438 if (idleRegionServerExist(cluster)) { 439 LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", 440 functionCost()); 441 return true; 442 } 443 444 if ( 445 // table isolation is inherently incompatible with naive "sloppy server" checks 446 !balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs) 447 ) { 448 LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", 449 functionCost()); 450 return true; 451 } 452 453 if (balancerConditionals.shouldRunBalancer(cluster)) { 454 LOG.info("Running balancer because conditional candidate generators have important moves"); 455 return true; 456 } 457 458 double total = 0.0; 459 float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized 460 for (CostFunction c : costFunctions) { 461 if (!c.isNeeded()) { 462 LOG.trace("{} not needed", c.getClass().getSimpleName()); 463 continue; 464 } 465 total += c.cost() * c.getMultiplier(); 466 localSumMultiplier += c.getMultiplier(); 467 } 468 sumMultiplier = localSumMultiplier; 469 boolean balanced = (total / sumMultiplier < minCostNeedBalance); 470 471 if (balanced) { 472 final double calculatedTotal = total; 473 sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier), 474 costFunctions); 475 LOG.info( 476 "{} - skipping load balancing because weighted average imbalance={} <= " 477 + "threshold({}) and conditionals do not have opinionated move candidates. " 478 + "If you want more aggressive balancing, either lower " 479 + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " 480 + "multiplier(s) of the specific cost function(s). functionCost={}", 481 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, 482 minCostNeedBalance, minCostNeedBalance, functionCost()); 483 } else { 484 LOG.info( 485 "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}", 486 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total, 487 minCostNeedBalance); 488 } 489 return !balanced; 490 } 491 492 @RestrictedApi(explanation = "Should only be called in tests", link = "", 493 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 494 Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) { 495 CandidateGenerator generator = getRandomGenerator(cluster); 496 return Pair.newPair(generator, generator.generate(cluster)); 497 } 498 499 /** 500 * Select the candidate generator to use based on the cost of cost functions. The chance of 501 * selecting a candidate generator is proportional to the share of cost of all cost functions 502 * among all cost functions that benefit from it. 503 */ 504 protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { 505 // Prefer conditional generators if they have moves to make 506 if (balancerConditionals.isConditionalBalancingEnabled()) { 507 for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) { 508 List<RegionPlanConditionalCandidateGenerator> generators = 509 conditional.getCandidateGenerators(); 510 for (RegionPlanConditionalCandidateGenerator generator : generators) { 511 if (generator.getWeight(cluster) > 0) { 512 return generator; 513 } 514 } 515 } 516 } 517 518 List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get(); 519 List<Double> partialSums = new ArrayList<>(generatorClasses.size()); 520 double sum = 0.0; 521 for (Class<? extends CandidateGenerator> clazz : generatorClasses) { 522 double weight = weightsOfGenerators.getOrDefault(clazz, 0.0); 523 sum += weight; 524 partialSums.add(sum); 525 } 526 527 // If the sum of all weights is zero, fall back to any generator 528 if (sum == 0.0) { 529 return pickAnyGenerator(generatorClasses); 530 } 531 532 double rand = ThreadLocalRandom.current().nextDouble(); 533 // Normalize partial sums so that the last one should be exactly 1.0 534 for (int i = 0; i < partialSums.size(); i++) { 535 partialSums.set(i, partialSums.get(i) / sum); 536 } 537 538 // Generate a random number and pick the first generator whose partial sum is >= rand 539 for (int i = 0; i < partialSums.size(); i++) { 540 if (rand <= partialSums.get(i)) { 541 return candidateGenerators.get(generatorClasses.get(i)); 542 } 543 } 544 545 // Fallback: if for some reason we didn't return above, return any generator 546 return pickAnyGenerator(generatorClasses); 547 } 548 549 private CandidateGenerator 550 pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) { 551 Class<? extends CandidateGenerator> randomClass = 552 generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size())); 553 return candidateGenerators.get(randomClass); 554 } 555 556 @RestrictedApi(explanation = "Should only be called in tests", link = "", 557 allowedOnPath = ".*/src/test/.*") 558 void setRackManager(RackManager rackManager) { 559 this.rackManager = rackManager; 560 } 561 562 private long calculateMaxSteps(BalancerClusterState cluster) { 563 return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers; 564 } 565 566 /** 567 * Given the cluster state this will try and approach an optimal balance. This should always 568 * approach the optimal state given enough steps. 569 */ 570 @Override 571 protected List<RegionPlan> balanceTable(TableName tableName, 572 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 573 // On clusters with lots of HFileLinks or lots of reference files, 574 // instantiating the storefile infos can be quite expensive. 575 // Allow turning this feature off if the locality cost is not going to 576 // be used in any computations. 577 RegionHDFSBlockLocationFinder finder = null; 578 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 579 finder = this.regionFinder; 580 } 581 582 // The clusterState that is given to this method contains the state 583 // of all the regions in the table(s) (that's true today) 584 // Keep track of servers to iterate through them. 585 BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager); 586 587 long startTime = EnvironmentEdgeManager.currentTime(); 588 cluster.setStopRequestedAt(startTime + maxRunningTime); 589 590 initCosts(cluster); 591 balancerConditionals.loadClusterState(cluster); 592 balancerConditionals.clearConditionalWeightCaches(); 593 594 float localSumMultiplier = 0; 595 for (CostFunction c : costFunctions) { 596 if (c.isNeeded()) { 597 localSumMultiplier += c.getMultiplier(); 598 } 599 } 600 sumMultiplier = localSumMultiplier; 601 if (sumMultiplier <= 0) { 602 LOG.error("At least one cost function needs a multiplier > 0. For example, set " 603 + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default"); 604 return null; 605 } 606 607 double currentCost = computeCost(cluster, Double.MAX_VALUE); 608 curOverallCost = currentCost; 609 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 610 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 611 double initCost = currentCost; 612 double newCost; 613 614 if (!needsBalance(tableName, cluster)) { 615 return null; 616 } 617 618 long computedMaxSteps; 619 if (runMaxSteps) { 620 computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster)); 621 } else { 622 long calculatedMaxSteps = calculateMaxSteps(cluster); 623 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 624 if (calculatedMaxSteps > maxSteps) { 625 LOG.warn( 626 "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 627 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 628 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 629 + "(This config change does not require service restart)", 630 calculatedMaxSteps, maxSteps); 631 } 632 } 633 LOG.info( 634 "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " 635 + "functionCost={} computedMaxSteps={}", 636 currentCost / sumMultiplier, functionCost(), computedMaxSteps); 637 638 final String initFunctionTotalCosts = totalCostsPerFunc(); 639 // Perform a stochastic walk to see if we can get a good fit. 640 long step; 641 boolean planImprovedConditionals = false; 642 Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>(); 643 Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>(); 644 for (step = 0; step < computedMaxSteps; step++) { 645 Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster); 646 CandidateGenerator generator = nextAction.getFirst(); 647 BalanceAction action = nextAction.getSecond(); 648 649 if (action.getType() == BalanceAction.Type.NULL) { 650 continue; 651 } 652 653 int conditionalViolationsChange = 0; 654 boolean isViolatingConditionals = false; 655 boolean moveImprovedConditionals = false; 656 // Only check conditionals if they are enabled 657 if (balancerConditionals.isConditionalBalancingEnabled()) { 658 // Always accept a conditional generator output. Sometimes conditional generators 659 // may need to make controversial moves in order to break what would otherwise 660 // be a deadlocked situation. 661 // Otherwise, for normal moves, evaluate the action. 662 if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) { 663 conditionalViolationsChange = -1; 664 } else { 665 conditionalViolationsChange = 666 balancerConditionals.getViolationCountChange(cluster, action); 667 isViolatingConditionals = balancerConditionals.isViolating(cluster, action); 668 } 669 moveImprovedConditionals = conditionalViolationsChange < 0; 670 if (moveImprovedConditionals) { 671 planImprovedConditionals = true; 672 } 673 } 674 675 // Change state and evaluate costs 676 try { 677 cluster.doAction(action); 678 } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) { 679 LOG.warn( 680 "Generator {} produced invalid action! " 681 + "Debug your candidate generator as this is likely a bug, " 682 + "and may cause a balancer deadlock. {}", 683 generator.getClass().getSimpleName(), action, e); 684 continue; 685 } 686 updateCostsAndWeightsWithAction(cluster, action); 687 generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum); 688 689 newCost = computeCost(cluster, currentCost); 690 691 if (LOG.isDebugEnabled() && action.getType() == BalanceAction.Type.MOVE_REGION) { 692 LOG.debug( 693 "action moving region {} from {} to {} with cost {}. currentCost={}, functionCost={}", 694 cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(), 695 cluster.servers[((MoveRegionAction) action).getFromServer()].getServerName(), 696 cluster.servers[((MoveRegionAction) action).getToServer()].getServerName(), newCost, 697 currentCost, functionCost()); 698 } 699 700 double costImprovement = currentCost - newCost; 701 double minimumImprovement = 702 Math.max(CostFunction.getCostEpsilon(currentCost), CostFunction.getCostEpsilon(newCost)); 703 boolean costsImproved = costImprovement > minimumImprovement; 704 boolean conditionalsSimilarCostsImproved = 705 (costsImproved && conditionalViolationsChange == 0 && !isViolatingConditionals); 706 // Our first priority is to reduce conditional violations 707 // Our second priority is to reduce balancer cost 708 // change, regardless of cost change 709 if (moveImprovedConditionals || conditionalsSimilarCostsImproved) { 710 currentCost = newCost; 711 generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(), 712 Long::sum); 713 714 // save for JMX 715 curOverallCost = currentCost; 716 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 717 } else { 718 // Put things back the way they were before. 719 // TODO: undo by remembering old values 720 BalanceAction undoAction = action.undoAction(); 721 cluster.doAction(undoAction); 722 updateCostsAndWeightsWithAction(cluster, undoAction); 723 } 724 725 if (cluster.isStopRequested()) { 726 break; 727 } 728 } 729 long endTime = EnvironmentEdgeManager.currentTime(); 730 731 StringJoiner joiner = new StringJoiner("\n"); 732 joiner.add("CandidateGenerator activity summary:"); 733 generatorToStepCount.forEach((generator, count) -> { 734 long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L); 735 joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count, 736 approvals)); 737 }); 738 LOG.debug(joiner.toString()); 739 740 metricsBalancer.balanceCluster(endTime - startTime); 741 742 if (planImprovedConditionals || (initCost > currentCost)) { 743 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 744 List<RegionPlan> plans = createRegionPlans(cluster); 745 LOG.info( 746 "Finished computing new moving plan. Computation took {} ms" 747 + " to try {} different iterations. Found a solution that moves " 748 + "{} regions; Going from a computed imbalance of {}" 749 + " to a new imbalance of {}. funtionCost={}", 750 endTime - startTime, step, plans.size(), initCost / sumMultiplier, 751 currentCost / sumMultiplier, functionCost()); 752 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 753 return plans; 754 } 755 LOG.info( 756 "Could not find a better moving plan. Tried {} different configurations in " 757 + "{} ms, and did not find anything with an imbalance score less than {} " 758 + "and could not improve conditional violations", 759 step, endTime - startTime, initCost / sumMultiplier); 760 return null; 761 } 762 763 protected void sendRejectionReasonToRingBuffer(Supplier<String> reason, 764 List<CostFunction> costFunctions) { 765 provider.recordBalancerRejection(() -> { 766 BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); 767 if (costFunctions != null) { 768 for (CostFunction c : costFunctions) { 769 if (!c.isNeeded()) { 770 continue; 771 } 772 builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier()); 773 } 774 } 775 return builder.build(); 776 }); 777 } 778 779 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 780 double initCost, String initFunctionTotalCosts, long step) { 781 provider.recordBalancerDecision(() -> { 782 List<String> regionPlans = new ArrayList<>(); 783 for (RegionPlan plan : plans) { 784 regionPlans 785 .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 786 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 787 } 788 return new BalancerDecision.Builder().setInitTotalCost(initCost) 789 .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) 790 .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) 791 .setRegionPlans(regionPlans).build(); 792 }); 793 } 794 795 /** 796 * update costs to JMX 797 */ 798 private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { 799 if (tableName == null) { 800 return; 801 } 802 803 // check if the metricsBalancer is MetricsStochasticBalancer before casting 804 if (metricsBalancer instanceof MetricsStochasticBalancer) { 805 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 806 // overall cost 807 balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME, 808 "Overall cost", overall); 809 810 // each cost function 811 for (int i = 0; i < costFunctions.size(); i++) { 812 CostFunction costFunction = costFunctions.get(i); 813 String costFunctionName = costFunction.getClass().getSimpleName(); 814 double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 815 // TODO: cost function may need a specific description 816 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 817 "The percent of " + costFunctionName, costPercent); 818 } 819 } 820 } 821 822 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 823 float multiplier = costFunction.getMultiplier(); 824 if (multiplier > 0) { 825 costFunctions.add(costFunction); 826 } 827 } 828 829 protected String functionCost() { 830 StringBuilder builder = new StringBuilder(); 831 for (CostFunction c : costFunctions) { 832 builder.append(c.getClass().getSimpleName()); 833 builder.append(" : ("); 834 if (c.isNeeded()) { 835 builder.append("multiplier=" + c.getMultiplier()); 836 builder.append(", "); 837 double cost = c.cost(); 838 builder.append("imbalance=" + cost); 839 if (cost >= minCostNeedBalance) { 840 builder.append(", need balance"); 841 } 842 } else { 843 builder.append("not needed"); 844 } 845 builder.append("); "); 846 } 847 return builder.toString(); 848 } 849 850 @RestrictedApi(explanation = "Should only be called in tests", link = "", 851 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 852 List<CostFunction> getCostFunctions() { 853 return costFunctions; 854 } 855 856 private String totalCostsPerFunc() { 857 StringBuilder builder = new StringBuilder(); 858 for (CostFunction c : costFunctions) { 859 if (!c.isNeeded()) { 860 continue; 861 } 862 double cost = c.getMultiplier() * c.cost(); 863 if (cost > 0.0) { 864 builder.append(" "); 865 builder.append(c.getClass().getSimpleName()); 866 builder.append(" : "); 867 builder.append(cost); 868 builder.append(";"); 869 } 870 } 871 if (builder.length() > 0) { 872 builder.deleteCharAt(builder.length() - 1); 873 } 874 return builder.toString(); 875 } 876 877 /** 878 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 879 * state. 880 * @param cluster The state of the cluster 881 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 882 */ 883 private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) { 884 List<RegionPlan> plans = new ArrayList<>(); 885 for (int regionIndex = 0; regionIndex 886 < cluster.regionIndexToServerIndex.length; regionIndex++) { 887 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 888 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 889 890 if (initialServerIndex != newServerIndex) { 891 RegionInfo region = cluster.regions[regionIndex]; 892 ServerName initialServer = cluster.servers[initialServerIndex]; 893 ServerName newServer = cluster.servers[newServerIndex]; 894 895 if (LOG.isTraceEnabled()) { 896 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 897 + initialServer.getHostname() + " to " + newServer.getHostname()); 898 } 899 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 900 plans.add(rp); 901 } 902 } 903 return plans; 904 } 905 906 /** 907 * Store the current region loads. 908 */ 909 private void updateRegionLoad() { 910 // We create a new hashmap so that regions that are no longer there are removed. 911 // However we temporarily need the old loads so we can use them to keep the rolling average. 912 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 913 loads = new HashMap<>(); 914 915 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 916 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 917 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 918 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 919 if (rLoads == null) { 920 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 921 } else if (rLoads.size() >= numRegionLoadsToRemember) { 922 rLoads.remove(); 923 } 924 rLoads.add(new BalancerRegionLoad(rm)); 925 loads.put(regionNameAsString, rLoads); 926 }); 927 }); 928 } 929 930 @RestrictedApi(explanation = "Should only be called in tests", link = "", 931 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 932 void initCosts(BalancerClusterState cluster) { 933 weightsOfGenerators.clear(); 934 for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) { 935 weightsOfGenerators.put(clazz, 0.0); 936 } 937 for (CostFunction c : costFunctions) { 938 c.prepare(cluster); 939 c.updateWeight(weightsOfGenerators); 940 } 941 } 942 943 /** 944 * Update both the costs of costfunctions and the weights of candidate generators 945 */ 946 @RestrictedApi(explanation = "Should only be called in tests", link = "", 947 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 948 void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { 949 // Reset all the weights to 0 950 for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) { 951 weightsOfGenerators.put(clazz, 0.0); 952 } 953 for (CostFunction c : costFunctions) { 954 if (c.isNeeded()) { 955 c.postAction(action); 956 c.updateWeight(weightsOfGenerators); 957 } 958 } 959 } 960 961 /** 962 * Get the names of the cost functions 963 */ 964 @RestrictedApi(explanation = "Should only be called in tests", link = "", 965 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 966 String[] getCostFunctionNames() { 967 String[] ret = new String[costFunctions.size()]; 968 for (int i = 0; i < costFunctions.size(); i++) { 969 CostFunction c = costFunctions.get(i); 970 ret[i] = c.getClass().getSimpleName(); 971 } 972 973 return ret; 974 } 975 976 /** 977 * This is the main cost function. It will compute a cost associated with a proposed cluster 978 * state. All different costs will be combined with their multipliers to produce a double cost. 979 * @param cluster The state of the cluster 980 * @param previousCost the previous cost. This is used as an early out. 981 * @return a double of a cost associated with the proposed cluster state. This cost is an 982 * aggregate of all individual cost functions. 983 */ 984 @RestrictedApi(explanation = "Should only be called in tests", link = "", 985 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 986 double computeCost(BalancerClusterState cluster, double previousCost) { 987 double total = 0; 988 989 for (int i = 0; i < costFunctions.size(); i++) { 990 CostFunction c = costFunctions.get(i); 991 this.tempFunctionCosts[i] = 0.0; 992 993 if (!c.isNeeded()) { 994 continue; 995 } 996 997 Float multiplier = c.getMultiplier(); 998 double cost = c.cost(); 999 1000 this.tempFunctionCosts[i] = multiplier * cost; 1001 total += this.tempFunctionCosts[i]; 1002 1003 if (total > previousCost) { 1004 break; 1005 } 1006 } 1007 1008 return total; 1009 } 1010 1011 /** 1012 * A helper function to compose the attribute name from tablename and costfunction name 1013 */ 1014 static String composeAttributeName(String tableName, String costFunctionName) { 1015 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 1016 } 1017}