001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.lang.reflect.Constructor; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.Deque; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.StringJoiner; 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.TimeUnit; 033import java.util.function.Supplier; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ClusterMetrics; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.RegionMetrics; 039import org.apache.hadoop.hbase.ServerMetrics; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.BalancerDecision; 043import org.apache.hadoop.hbase.client.BalancerRejection; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.master.RackManager; 046import org.apache.hadoop.hbase.master.RegionPlan; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.hadoop.hbase.util.ReflectionUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 056 057/** 058 * <p> 059 * This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and 060 * mutate the cluster to Cprime. If F(Cprime) < F(C) then the new cluster state becomes the plan. 061 * It includes costs functions to compute the cost of: 062 * </p> 063 * <ul> 064 * <li>Region Load</li> 065 * <li>Table Load</li> 066 * <li>Data Locality</li> 067 * <li>Memstore Sizes</li> 068 * <li>Storefile Sizes</li> 069 * </ul> 070 * <p> 071 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best 072 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are 073 * scaled by their respective multipliers: 074 * </p> 075 * <ul> 076 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 077 * <li>hbase.master.balancer.stochastic.moveCost</li> 078 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 079 * <li>hbase.master.balancer.stochastic.localityCost</li> 080 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 081 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 082 * </ul> 083 * <p> 084 * You can also add custom Cost function by setting the the following configuration value: 085 * </p> 086 * <ul> 087 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 088 * </ul> 089 * <p> 090 * All custom Cost Functions needs to extends {@link CostFunction} 091 * </p> 092 * <p> 093 * In addition to the above configurations, the balancer can be tuned by the following configuration 094 * values: 095 * </p> 096 * <ul> 097 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions 098 * that can be moved in a single invocation of this balancer.</li> 099 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 100 * regions is multiplied to try and get the number of times the balancer will mutate all 101 * servers.</li> 102 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the 103 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and 104 * the above computation.</li> 105 * </ul> 106 * <p> 107 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the 108 * balancer gets the full picture of all loads on the cluster. 109 * </p> 110 */ 111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 112public class StochasticLoadBalancer extends BaseLoadBalancer { 113 114 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 115 116 protected static final String STEPS_PER_REGION_KEY = 117 "hbase.master.balancer.stochastic.stepsPerRegion"; 118 protected static final int DEFAULT_STEPS_PER_REGION = 800; 119 protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; 120 protected static final int DEFAULT_MAX_STEPS = 1000000; 121 protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps"; 122 protected static final boolean DEFAULT_RUN_MAX_STEPS = false; 123 protected static final String MAX_RUNNING_TIME_KEY = 124 "hbase.master.balancer.stochastic.maxRunningTime"; 125 protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds. 126 protected static final String KEEP_REGION_LOADS = 127 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 128 protected static final int DEFAULT_KEEP_REGION_LOADS = 15; 129 private static final String TABLE_FUNCTION_SEP = "_"; 130 protected static final String MIN_COST_NEED_BALANCE_KEY = 131 "hbase.master.balancer.stochastic.minCostNeedBalance"; 132 protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f; 133 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 134 "hbase.master.balancer.stochastic.additionalCostFunctions"; 135 public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; 136 137 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 138 139 // values are defaults 140 private int maxSteps = DEFAULT_MAX_STEPS; 141 private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; 142 private int stepsPerRegion = DEFAULT_STEPS_PER_REGION; 143 private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; 144 private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; 145 private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; 146 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>(); 147 148 protected List<CostFunction> costFunctions; // FindBugs: Wants this protected; 149 // IS2_INCONSISTENT_SYNC 150 // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost 151 private float sumMultiplier; 152 // to save and report costs to JMX 153 private double curOverallCost = 0d; 154 private double[] tempFunctionCosts; 155 private double[] curFunctionCosts; 156 157 // Keep locality based picker and cost function to alert them 158 // when new services are offered 159 private LocalityBasedCandidateGenerator localityCandidateGenerator; 160 private ServerLocalityCostFunction localityCost; 161 private RackLocalityCostFunction rackLocalityCost; 162 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 163 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 164 165 private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators = 166 new HashMap<>(); 167 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators; 168 protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses = 169 Suppliers.memoizeWithExpiration(() -> { 170 List<Class<? extends CandidateGenerator>> shuffled = 171 new ArrayList<>(candidateGenerators.keySet()); 172 Collections.shuffle(shuffled); 173 return shuffled; 174 }, 5, TimeUnit.SECONDS); 175 176 /** 177 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 178 * default MetricsBalancer 179 */ 180 public StochasticLoadBalancer() { 181 super(new MetricsStochasticBalancer()); 182 } 183 184 @RestrictedApi(explanation = "Should only be called in tests", link = "", 185 allowedOnPath = ".*/src/test/.*") 186 public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) { 187 super(metricsStochasticBalancer); 188 } 189 190 private static CostFunction createCostFunction(Class<? extends CostFunction> clazz, 191 Configuration conf) { 192 try { 193 Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class); 194 return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); 195 } catch (NoSuchMethodException e) { 196 // will try construct with no parameter 197 } 198 return ReflectionUtils.newInstance(clazz); 199 } 200 201 private void loadCustomCostFunctions(Configuration conf) { 202 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 203 204 if (null == functionsNames) { 205 return; 206 } 207 for (String className : functionsNames) { 208 Class<? extends CostFunction> clazz; 209 try { 210 clazz = Class.forName(className).asSubclass(CostFunction.class); 211 } catch (ClassNotFoundException e) { 212 LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); 213 continue; 214 } 215 CostFunction func = createCostFunction(clazz, conf); 216 LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); 217 costFunctions.add(func); 218 } 219 } 220 221 @RestrictedApi(explanation = "Should only be called in tests", link = "", 222 allowedOnPath = ".*/src/test/.*") 223 Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() { 224 return this.candidateGenerators; 225 } 226 227 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> 228 createCandidateGenerators() { 229 Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators = 230 new HashMap<>(5); 231 candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); 232 candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); 233 candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); 234 candidateGenerators.put(RegionReplicaCandidateGenerator.class, 235 new RegionReplicaCandidateGenerator()); 236 candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, 237 new RegionReplicaRackCandidateGenerator()); 238 return candidateGenerators; 239 } 240 241 protected List<CostFunction> createCostFunctions(Configuration conf) { 242 List<CostFunction> costFunctions = new ArrayList<>(); 243 addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); 244 addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); 245 addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); 246 addCostFunction(costFunctions, localityCost); 247 addCostFunction(costFunctions, rackLocalityCost); 248 addCostFunction(costFunctions, new TableSkewCostFunction(conf)); 249 addCostFunction(costFunctions, regionReplicaHostCostFunction); 250 addCostFunction(costFunctions, regionReplicaRackCostFunction); 251 addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); 252 addCostFunction(costFunctions, new CPRequestCostFunction(conf)); 253 addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); 254 addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); 255 addCostFunction(costFunctions, new StoreFileCostFunction(conf)); 256 return costFunctions; 257 } 258 259 @Override 260 protected void loadConf(Configuration conf) { 261 super.loadConf(conf); 262 maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS); 263 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION); 264 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME); 265 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS); 266 267 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS); 268 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE); 269 localityCandidateGenerator = new LocalityBasedCandidateGenerator(); 270 localityCost = new ServerLocalityCostFunction(conf); 271 rackLocalityCost = new RackLocalityCostFunction(conf); 272 273 this.candidateGenerators = createCandidateGenerators(); 274 275 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 276 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 277 this.costFunctions = createCostFunctions(conf); 278 loadCustomCostFunctions(conf); 279 280 curFunctionCosts = new double[costFunctions.size()]; 281 tempFunctionCosts = new double[costFunctions.size()]; 282 283 LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps 284 + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" 285 + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) 286 + " , sum of multiplier of cost functions = " + sumMultiplier + " etc."); 287 } 288 289 @Override 290 public void updateClusterMetrics(ClusterMetrics st) { 291 super.updateClusterMetrics(st); 292 updateRegionLoad(); 293 294 // update metrics size 295 try { 296 // by-table or ensemble mode 297 int tablesCount = isByTable ? provider.getNumberOfTables() : 1; 298 int functionsCount = getCostFunctionNames().length; 299 300 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 301 } catch (Exception e) { 302 LOG.error("failed to get the size of all tables", e); 303 } 304 } 305 306 private void updateBalancerTableLoadInfo(TableName tableName, 307 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 308 RegionHDFSBlockLocationFinder finder = null; 309 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 310 finder = this.regionFinder; 311 } 312 BalancerClusterState cluster = 313 new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); 314 315 initCosts(cluster); 316 curOverallCost = computeCost(cluster, Double.MAX_VALUE); 317 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 318 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 319 } 320 321 @Override 322 public void 323 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 324 if (isByTable) { 325 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 326 updateBalancerTableLoadInfo(tableName, loadOfOneTable); 327 }); 328 } else { 329 updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, 330 toEnsumbleTableLoad(loadOfAllTable)); 331 } 332 } 333 334 /** 335 * Update the number of metrics that are reported to JMX 336 */ 337 @RestrictedApi(explanation = "Should only be called in tests", link = "", 338 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 339 void updateMetricsSize(int size) { 340 if (metricsBalancer instanceof MetricsStochasticBalancer) { 341 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 342 } 343 } 344 345 private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) { 346 if (c.numHosts >= c.maxReplicas) { 347 regionReplicaHostCostFunction.prepare(c); 348 double hostCost = Math.abs(regionReplicaHostCostFunction.cost()); 349 boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost); 350 if (colocatedAtHost) { 351 return true; 352 } 353 LOG.trace("No host colocation detected with host cost={}", hostCost); 354 } 355 return false; 356 } 357 358 private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) { 359 if (c.numRacks >= c.maxReplicas) { 360 regionReplicaRackCostFunction.prepare(c); 361 double rackCost = Math.abs(regionReplicaRackCostFunction.cost()); 362 boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost); 363 if (colocatedAtRack) { 364 return true; 365 } 366 LOG.trace("No rack colocation detected with rack cost={}", rackCost); 367 } else { 368 LOG.trace("Rack colocation is inevitable with fewer racks than replicas, " 369 + "so we won't bother checking"); 370 } 371 return false; 372 } 373 374 private String getBalanceReason(double total, double sumMultiplier) { 375 if (total <= 0) { 376 return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0"; 377 } else if (sumMultiplier <= 0) { 378 return "sumMultiplier = " + sumMultiplier + " <= 0"; 379 } else if ((total / sumMultiplier) < minCostNeedBalance) { 380 return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " 381 + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")"; 382 } else { 383 return ""; 384 } 385 } 386 387 @RestrictedApi(explanation = "Should only be called in tests", link = "", 388 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 389 boolean needsBalance(TableName tableName, BalancerClusterState cluster) { 390 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 391 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 392 LOG.info( 393 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 394 sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers() 395 + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null); 396 return false; 397 } 398 399 if (areSomeRegionReplicasColocatedOnHost(cluster)) { 400 LOG.info("Running balancer because at least one server hosts replicas of the same region." 401 + " function cost={}", functionCost()); 402 return true; 403 } 404 405 if (areSomeRegionReplicasColocatedOnRack(cluster)) { 406 LOG.info("Running balancer because at least one rack hosts replicas of the same region." 407 + " function cost={}", functionCost()); 408 return true; 409 } 410 411 if (idleRegionServerExist(cluster)) { 412 LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", 413 functionCost()); 414 return true; 415 } 416 417 if (sloppyRegionServerExist(cs)) { 418 LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", 419 functionCost()); 420 return true; 421 } 422 423 double total = 0.0; 424 float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized 425 for (CostFunction c : costFunctions) { 426 if (!c.isNeeded()) { 427 LOG.trace("{} not needed", c.getClass().getSimpleName()); 428 continue; 429 } 430 total += c.cost() * c.getMultiplier(); 431 localSumMultiplier += c.getMultiplier(); 432 } 433 sumMultiplier = localSumMultiplier; 434 boolean balanced = (total / sumMultiplier < minCostNeedBalance); 435 436 if (balanced) { 437 final double calculatedTotal = total; 438 sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier), 439 costFunctions); 440 LOG.info( 441 "{} - skipping load balancing because weighted average imbalance={} <= " 442 + "threshold({}). If you want more aggressive balancing, either lower " 443 + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " 444 + "multiplier(s) of the specific cost function(s). functionCost={}", 445 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, 446 minCostNeedBalance, minCostNeedBalance, functionCost()); 447 } else { 448 LOG.info("{} - Calculating plan. may take up to {}ms to complete.", 449 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); 450 } 451 return !balanced; 452 } 453 454 @RestrictedApi(explanation = "Should only be called in tests", link = "", 455 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 456 Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) { 457 CandidateGenerator generator = getRandomGenerator(); 458 return Pair.newPair(generator, generator.generate(cluster)); 459 } 460 461 /** 462 * Select the candidate generator to use based on the cost of cost functions. The chance of 463 * selecting a candidate generator is proportional to the share of cost of all cost functions 464 * among all cost functions that benefit from it. 465 */ 466 protected CandidateGenerator getRandomGenerator() { 467 Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available."); 468 List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get(); 469 List<Double> partialSums = new ArrayList<>(generatorClasses.size()); 470 double sum = 0.0; 471 for (Class<? extends CandidateGenerator> clazz : generatorClasses) { 472 double weight = weightsOfGenerators.getOrDefault(clazz, 0.0); 473 sum += weight; 474 partialSums.add(sum); 475 } 476 477 // If the sum of all weights is zero, fall back to any generator 478 if (sum == 0.0) { 479 return pickAnyGenerator(generatorClasses); 480 } 481 482 double rand = ThreadLocalRandom.current().nextDouble(); 483 // Normalize partial sums so that the last one should be exactly 1.0 484 for (int i = 0; i < partialSums.size(); i++) { 485 partialSums.set(i, partialSums.get(i) / sum); 486 } 487 488 // Generate a random number and pick the first generator whose partial sum is >= rand 489 for (int i = 0; i < partialSums.size(); i++) { 490 if (rand <= partialSums.get(i)) { 491 return candidateGenerators.get(generatorClasses.get(i)); 492 } 493 } 494 495 // Fallback: if for some reason we didn't return above, return any generator 496 return pickAnyGenerator(generatorClasses); 497 } 498 499 private CandidateGenerator 500 pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) { 501 Class<? extends CandidateGenerator> randomClass = 502 generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size())); 503 return candidateGenerators.get(randomClass); 504 } 505 506 @RestrictedApi(explanation = "Should only be called in tests", link = "", 507 allowedOnPath = ".*/src/test/.*") 508 void setRackManager(RackManager rackManager) { 509 this.rackManager = rackManager; 510 } 511 512 private long calculateMaxSteps(BalancerClusterState cluster) { 513 return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers; 514 } 515 516 /** 517 * Given the cluster state this will try and approach an optimal balance. This should always 518 * approach the optimal state given enough steps. 519 */ 520 @Override 521 protected List<RegionPlan> balanceTable(TableName tableName, 522 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 523 // On clusters with lots of HFileLinks or lots of reference files, 524 // instantiating the storefile infos can be quite expensive. 525 // Allow turning this feature off if the locality cost is not going to 526 // be used in any computations. 527 RegionHDFSBlockLocationFinder finder = null; 528 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 529 finder = this.regionFinder; 530 } 531 532 // The clusterState that is given to this method contains the state 533 // of all the regions in the table(s) (that's true today) 534 // Keep track of servers to iterate through them. 535 BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, 536 rackManager, regionCacheRatioOnOldServerMap); 537 538 long startTime = EnvironmentEdgeManager.currentTime(); 539 540 initCosts(cluster); 541 542 float localSumMultiplier = 0; 543 for (CostFunction c : costFunctions) { 544 if (c.isNeeded()) { 545 localSumMultiplier += c.getMultiplier(); 546 } 547 } 548 sumMultiplier = localSumMultiplier; 549 if (sumMultiplier <= 0) { 550 LOG.error("At least one cost function needs a multiplier > 0. For example, set " 551 + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default"); 552 return null; 553 } 554 555 double currentCost = computeCost(cluster, Double.MAX_VALUE); 556 curOverallCost = currentCost; 557 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 558 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 559 double initCost = currentCost; 560 double newCost; 561 562 if (!needsBalance(tableName, cluster)) { 563 return null; 564 } 565 566 long computedMaxSteps; 567 if (runMaxSteps) { 568 computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster)); 569 } else { 570 long calculatedMaxSteps = calculateMaxSteps(cluster); 571 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 572 if (calculatedMaxSteps > maxSteps) { 573 LOG.warn( 574 "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 575 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 576 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 577 + "(This config change does not require service restart)", 578 calculatedMaxSteps, maxSteps); 579 } 580 } 581 LOG.info( 582 "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " 583 + "functionCost={} computedMaxSteps={}", 584 currentCost / sumMultiplier, functionCost(), computedMaxSteps); 585 586 final String initFunctionTotalCosts = totalCostsPerFunc(); 587 // Perform a stochastic walk to see if we can get a good fit. 588 long step; 589 Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>(); 590 Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>(); 591 for (step = 0; step < computedMaxSteps; step++) { 592 Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster); 593 CandidateGenerator generator = nextAction.getFirst(); 594 BalanceAction action = nextAction.getSecond(); 595 596 if (action.getType() == BalanceAction.Type.NULL) { 597 continue; 598 } 599 600 cluster.doAction(action); 601 updateCostsAndWeightsWithAction(cluster, action); 602 generatorToStepCount.merge(generator.getClass(), 1L, Long::sum); 603 604 newCost = computeCost(cluster, currentCost); 605 606 // Should this be kept? 607 if (newCost < currentCost) { 608 currentCost = newCost; 609 generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum); 610 611 // save for JMX 612 curOverallCost = currentCost; 613 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 614 } else { 615 // Put things back the way they were before. 616 // TODO: undo by remembering old values 617 BalanceAction undoAction = action.undoAction(); 618 cluster.doAction(undoAction); 619 updateCostsAndWeightsWithAction(cluster, undoAction); 620 } 621 622 if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { 623 break; 624 } 625 } 626 long endTime = EnvironmentEdgeManager.currentTime(); 627 628 StringJoiner joiner = new StringJoiner("\n"); 629 joiner.add("CandidateGenerator activity summary:"); 630 generatorToStepCount.forEach((generator, count) -> { 631 long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L); 632 joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count, 633 approvals)); 634 }); 635 LOG.debug(joiner.toString()); 636 637 metricsBalancer.balanceCluster(endTime - startTime); 638 639 if (initCost > currentCost) { 640 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 641 List<RegionPlan> plans = createRegionPlans(cluster); 642 LOG.info( 643 "Finished computing new moving plan. Computation took {} ms" 644 + " to try {} different iterations. Found a solution that moves " 645 + "{} regions; Going from a computed imbalance of {}" 646 + " to a new imbalance of {}. funtionCost={}", 647 endTime - startTime, step, plans.size(), initCost / sumMultiplier, 648 currentCost / sumMultiplier, functionCost()); 649 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 650 return plans; 651 } 652 LOG.info( 653 "Could not find a better moving plan. Tried {} different configurations in " 654 + "{} ms, and did not find anything with an imbalance score less than {}", 655 step, endTime - startTime, initCost / sumMultiplier); 656 return null; 657 } 658 659 protected void sendRejectionReasonToRingBuffer(Supplier<String> reason, 660 List<CostFunction> costFunctions) { 661 provider.recordBalancerRejection(() -> { 662 BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); 663 if (costFunctions != null) { 664 for (CostFunction c : costFunctions) { 665 if (!c.isNeeded()) { 666 continue; 667 } 668 builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier()); 669 } 670 } 671 return builder.build(); 672 }); 673 } 674 675 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 676 double initCost, String initFunctionTotalCosts, long step) { 677 provider.recordBalancerDecision(() -> { 678 List<String> regionPlans = new ArrayList<>(); 679 for (RegionPlan plan : plans) { 680 regionPlans 681 .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 682 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 683 } 684 return new BalancerDecision.Builder().setInitTotalCost(initCost) 685 .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) 686 .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) 687 .setRegionPlans(regionPlans).build(); 688 }); 689 } 690 691 /** 692 * update costs to JMX 693 */ 694 private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { 695 if (tableName == null) { 696 return; 697 } 698 699 // check if the metricsBalancer is MetricsStochasticBalancer before casting 700 if (metricsBalancer instanceof MetricsStochasticBalancer) { 701 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 702 // overall cost 703 balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME, 704 "Overall cost", overall); 705 706 // each cost function 707 for (int i = 0; i < costFunctions.size(); i++) { 708 CostFunction costFunction = costFunctions.get(i); 709 String costFunctionName = costFunction.getClass().getSimpleName(); 710 double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 711 // TODO: cost function may need a specific description 712 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 713 "The percent of " + costFunctionName, costPercent); 714 } 715 } 716 } 717 718 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 719 float multiplier = costFunction.getMultiplier(); 720 if (multiplier > 0) { 721 costFunctions.add(costFunction); 722 } 723 } 724 725 protected String functionCost() { 726 StringBuilder builder = new StringBuilder(); 727 for (CostFunction c : costFunctions) { 728 builder.append(c.getClass().getSimpleName()); 729 builder.append(" : ("); 730 if (c.isNeeded()) { 731 builder.append("multiplier=" + c.getMultiplier()); 732 builder.append(", "); 733 double cost = c.cost(); 734 builder.append("imbalance=" + cost); 735 if (cost >= minCostNeedBalance) { 736 builder.append(", need balance"); 737 } 738 } else { 739 builder.append("not needed"); 740 } 741 builder.append("); "); 742 } 743 return builder.toString(); 744 } 745 746 @RestrictedApi(explanation = "Should only be called in tests", link = "", 747 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 748 List<CostFunction> getCostFunctions() { 749 return costFunctions; 750 } 751 752 private String totalCostsPerFunc() { 753 StringBuilder builder = new StringBuilder(); 754 for (CostFunction c : costFunctions) { 755 if (!c.isNeeded()) { 756 continue; 757 } 758 double cost = c.getMultiplier() * c.cost(); 759 if (cost > 0.0) { 760 builder.append(" "); 761 builder.append(c.getClass().getSimpleName()); 762 builder.append(" : "); 763 builder.append(cost); 764 builder.append(";"); 765 } 766 } 767 if (builder.length() > 0) { 768 builder.deleteCharAt(builder.length() - 1); 769 } 770 return builder.toString(); 771 } 772 773 /** 774 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 775 * state. 776 * @param cluster The state of the cluster 777 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 778 */ 779 private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) { 780 List<RegionPlan> plans = new ArrayList<>(); 781 for (int regionIndex = 0; regionIndex 782 < cluster.regionIndexToServerIndex.length; regionIndex++) { 783 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 784 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 785 786 if (initialServerIndex != newServerIndex) { 787 RegionInfo region = cluster.regions[regionIndex]; 788 ServerName initialServer = cluster.servers[initialServerIndex]; 789 ServerName newServer = cluster.servers[newServerIndex]; 790 791 if (LOG.isTraceEnabled()) { 792 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 793 + initialServer.getHostname() + " to " + newServer.getHostname()); 794 } 795 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 796 plans.add(rp); 797 } 798 } 799 return plans; 800 } 801 802 /** 803 * Store the current region loads. 804 */ 805 private void updateRegionLoad() { 806 // We create a new hashmap so that regions that are no longer there are removed. 807 // However we temporarily need the old loads so we can use them to keep the rolling average. 808 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 809 loads = new HashMap<>(); 810 811 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 812 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 813 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 814 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 815 if (rLoads == null) { 816 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 817 } else if (rLoads.size() >= numRegionLoadsToRemember) { 818 rLoads.remove(); 819 } 820 rLoads.add(new BalancerRegionLoad(rm)); 821 loads.put(regionNameAsString, rLoads); 822 }); 823 }); 824 } 825 826 @RestrictedApi(explanation = "Should only be called in tests", link = "", 827 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 828 void initCosts(BalancerClusterState cluster) { 829 weightsOfGenerators.clear(); 830 for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) { 831 weightsOfGenerators.put(clazz, 0.0); 832 } 833 for (CostFunction c : costFunctions) { 834 c.prepare(cluster); 835 c.updateWeight(weightsOfGenerators); 836 } 837 } 838 839 /** 840 * Update both the costs of costfunctions and the weights of candidate generators 841 */ 842 @RestrictedApi(explanation = "Should only be called in tests", link = "", 843 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 844 void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { 845 // Reset all the weights to 0 846 for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) { 847 weightsOfGenerators.put(clazz, 0.0); 848 } 849 for (CostFunction c : costFunctions) { 850 if (c.isNeeded()) { 851 c.postAction(action); 852 c.updateWeight(weightsOfGenerators); 853 } 854 } 855 } 856 857 /** 858 * Get the names of the cost functions 859 */ 860 @RestrictedApi(explanation = "Should only be called in tests", link = "", 861 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 862 String[] getCostFunctionNames() { 863 String[] ret = new String[costFunctions.size()]; 864 for (int i = 0; i < costFunctions.size(); i++) { 865 CostFunction c = costFunctions.get(i); 866 ret[i] = c.getClass().getSimpleName(); 867 } 868 869 return ret; 870 } 871 872 /** 873 * This is the main cost function. It will compute a cost associated with a proposed cluster 874 * state. All different costs will be combined with their multipliers to produce a double cost. 875 * @param cluster The state of the cluster 876 * @param previousCost the previous cost. This is used as an early out. 877 * @return a double of a cost associated with the proposed cluster state. This cost is an 878 * aggregate of all individual cost functions. 879 */ 880 @RestrictedApi(explanation = "Should only be called in tests", link = "", 881 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 882 double computeCost(BalancerClusterState cluster, double previousCost) { 883 double total = 0; 884 885 for (int i = 0; i < costFunctions.size(); i++) { 886 CostFunction c = costFunctions.get(i); 887 this.tempFunctionCosts[i] = 0.0; 888 889 if (!c.isNeeded()) { 890 continue; 891 } 892 893 Float multiplier = c.getMultiplier(); 894 double cost = c.cost(); 895 896 this.tempFunctionCosts[i] = multiplier * cost; 897 total += this.tempFunctionCosts[i]; 898 899 if (total > previousCost) { 900 break; 901 } 902 } 903 904 return total; 905 } 906 907 /** 908 * A helper function to compose the attribute name from tablename and costfunction name 909 */ 910 static String composeAttributeName(String tableName, String costFunctionName) { 911 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 912 } 913}