001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.lang.reflect.Constructor; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.function.Supplier; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.RegionMetrics; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.BalancerDecision; 040import org.apache.hadoop.hbase.client.BalancerRejection; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.master.RackManager; 043import org.apache.hadoop.hbase.master.RegionPlan; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.util.ReflectionUtils; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * <p> 053 * This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and 054 * mutate the cluster to Cprime. If F(Cprime) < F(C) then the new cluster state becomes the plan. 055 * It includes costs functions to compute the cost of: 056 * </p> 057 * <ul> 058 * <li>Region Load</li> 059 * <li>Table Load</li> 060 * <li>Data Locality</li> 061 * <li>Memstore Sizes</li> 062 * <li>Storefile Sizes</li> 063 * </ul> 064 * <p> 065 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best 066 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are 067 * scaled by their respective multipliers: 068 * </p> 069 * <ul> 070 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 071 * <li>hbase.master.balancer.stochastic.moveCost</li> 072 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 073 * <li>hbase.master.balancer.stochastic.localityCost</li> 074 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 075 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 076 * </ul> 077 * <p> 078 * You can also add custom Cost function by setting the the following configuration value: 079 * </p> 080 * <ul> 081 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 082 * </ul> 083 * <p> 084 * All custom Cost Functions needs to extends {@link CostFunction} 085 * </p> 086 * <p> 087 * In addition to the above configurations, the balancer can be tuned by the following configuration 088 * values: 089 * </p> 090 * <ul> 091 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions 092 * that can be moved in a single invocation of this balancer.</li> 093 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 094 * regions is multiplied to try and get the number of times the balancer will mutate all 095 * servers.</li> 096 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the 097 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and 098 * the above computation.</li> 099 * </ul> 100 * <p> 101 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the 102 * balancer gets the full picture of all loads on the cluster. 103 * </p> 104 */ 105@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 106public class StochasticLoadBalancer extends BaseLoadBalancer { 107 108 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 109 110 protected static final String STEPS_PER_REGION_KEY = 111 "hbase.master.balancer.stochastic.stepsPerRegion"; 112 protected static final int DEFAULT_STEPS_PER_REGION = 800; 113 protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; 114 protected static final int DEFAULT_MAX_STEPS = 1000000; 115 protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps"; 116 protected static final boolean DEFAULT_RUN_MAX_STEPS = false; 117 protected static final String MAX_RUNNING_TIME_KEY = 118 "hbase.master.balancer.stochastic.maxRunningTime"; 119 protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds. 120 protected static final String KEEP_REGION_LOADS = 121 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 122 protected static final int DEFAULT_KEEP_REGION_LOADS = 15; 123 private static final String TABLE_FUNCTION_SEP = "_"; 124 protected static final String MIN_COST_NEED_BALANCE_KEY = 125 "hbase.master.balancer.stochastic.minCostNeedBalance"; 126 protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f; 127 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 128 "hbase.master.balancer.stochastic.additionalCostFunctions"; 129 public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; 130 131 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 132 133 // values are defaults 134 private int maxSteps = DEFAULT_MAX_STEPS; 135 private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; 136 private int stepsPerRegion = DEFAULT_STEPS_PER_REGION; 137 private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; 138 private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; 139 private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; 140 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>(); 141 142 protected List<CostFunction> costFunctions; // FindBugs: Wants this protected; 143 // IS2_INCONSISTENT_SYNC 144 // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost 145 private float sumMultiplier; 146 // to save and report costs to JMX 147 private double curOverallCost = 0d; 148 private double[] tempFunctionCosts; 149 private double[] curFunctionCosts; 150 private double[] weightsOfGenerators; 151 152 // Keep locality based picker and cost function to alert them 153 // when new services are offered 154 private LocalityBasedCandidateGenerator localityCandidateGenerator; 155 private ServerLocalityCostFunction localityCost; 156 private RackLocalityCostFunction rackLocalityCost; 157 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 158 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 159 160 protected List<CandidateGenerator> candidateGenerators; 161 162 public enum GeneratorType { 163 RANDOM, 164 LOAD, 165 LOCALITY, 166 RACK 167 } 168 169 /** 170 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 171 * default MetricsBalancer 172 */ 173 public StochasticLoadBalancer() { 174 super(new MetricsStochasticBalancer()); 175 } 176 177 @RestrictedApi(explanation = "Should only be called in tests", link = "", 178 allowedOnPath = ".*/src/test/.*") 179 public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) { 180 super(metricsStochasticBalancer); 181 } 182 183 private static CostFunction createCostFunction(Class<? extends CostFunction> clazz, 184 Configuration conf) { 185 try { 186 Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class); 187 return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); 188 } catch (NoSuchMethodException e) { 189 // will try construct with no parameter 190 } 191 return ReflectionUtils.newInstance(clazz); 192 } 193 194 private void loadCustomCostFunctions(Configuration conf) { 195 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 196 197 if (null == functionsNames) { 198 return; 199 } 200 for (String className : functionsNames) { 201 Class<? extends CostFunction> clazz; 202 try { 203 clazz = Class.forName(className).asSubclass(CostFunction.class); 204 } catch (ClassNotFoundException e) { 205 LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); 206 continue; 207 } 208 CostFunction func = createCostFunction(clazz, conf); 209 LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); 210 costFunctions.add(func); 211 } 212 } 213 214 @RestrictedApi(explanation = "Should only be called in tests", link = "", 215 allowedOnPath = ".*/src/test/.*") 216 List<CandidateGenerator> getCandidateGenerators() { 217 return this.candidateGenerators; 218 } 219 220 protected List<CandidateGenerator> createCandidateGenerators() { 221 List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4); 222 candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator()); 223 candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator()); 224 candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator); 225 candidateGenerators.add(GeneratorType.RACK.ordinal(), 226 new RegionReplicaRackCandidateGenerator()); 227 return candidateGenerators; 228 } 229 230 protected List<CostFunction> createCostFunctions(Configuration conf) { 231 List<CostFunction> costFunctions = new ArrayList<>(); 232 addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); 233 addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); 234 addCostFunction(costFunctions, new MoveCostFunction(conf, provider)); 235 addCostFunction(costFunctions, localityCost); 236 addCostFunction(costFunctions, rackLocalityCost); 237 addCostFunction(costFunctions, new TableSkewCostFunction(conf)); 238 addCostFunction(costFunctions, regionReplicaHostCostFunction); 239 addCostFunction(costFunctions, regionReplicaRackCostFunction); 240 addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); 241 addCostFunction(costFunctions, new CPRequestCostFunction(conf)); 242 addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); 243 addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); 244 addCostFunction(costFunctions, new StoreFileCostFunction(conf)); 245 return costFunctions; 246 } 247 248 @Override 249 protected void loadConf(Configuration conf) { 250 super.loadConf(conf); 251 maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS); 252 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION); 253 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME); 254 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS); 255 256 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS); 257 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE); 258 localityCandidateGenerator = new LocalityBasedCandidateGenerator(); 259 localityCost = new ServerLocalityCostFunction(conf); 260 rackLocalityCost = new RackLocalityCostFunction(conf); 261 262 this.candidateGenerators = createCandidateGenerators(); 263 264 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 265 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 266 this.costFunctions = createCostFunctions(conf); 267 loadCustomCostFunctions(conf); 268 269 curFunctionCosts = new double[costFunctions.size()]; 270 tempFunctionCosts = new double[costFunctions.size()]; 271 272 LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps 273 + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" 274 + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) 275 + " , sum of multiplier of cost functions = " + sumMultiplier + " etc."); 276 } 277 278 @Override 279 public void updateClusterMetrics(ClusterMetrics st) { 280 super.updateClusterMetrics(st); 281 updateRegionLoad(); 282 283 // update metrics size 284 try { 285 // by-table or ensemble mode 286 int tablesCount = isByTable ? provider.getNumberOfTables() : 1; 287 int functionsCount = getCostFunctionNames().length; 288 289 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 290 } catch (Exception e) { 291 LOG.error("failed to get the size of all tables", e); 292 } 293 } 294 295 private void updateBalancerTableLoadInfo(TableName tableName, 296 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 297 RegionHDFSBlockLocationFinder finder = null; 298 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 299 finder = this.regionFinder; 300 } 301 BalancerClusterState cluster = 302 new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); 303 304 initCosts(cluster); 305 curOverallCost = computeCost(cluster, Double.MAX_VALUE); 306 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 307 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 308 } 309 310 @Override 311 public void 312 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 313 if (isByTable) { 314 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 315 updateBalancerTableLoadInfo(tableName, loadOfOneTable); 316 }); 317 } else { 318 updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, 319 toEnsumbleTableLoad(loadOfAllTable)); 320 } 321 } 322 323 /** 324 * Update the number of metrics that are reported to JMX 325 */ 326 @RestrictedApi(explanation = "Should only be called in tests", link = "", 327 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 328 void updateMetricsSize(int size) { 329 if (metricsBalancer instanceof MetricsStochasticBalancer) { 330 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 331 } 332 } 333 334 private boolean areSomeRegionReplicasColocated(BalancerClusterState c) { 335 regionReplicaHostCostFunction.prepare(c); 336 return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON); 337 } 338 339 private String getBalanceReason(double total, double sumMultiplier) { 340 if (total <= 0) { 341 return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0"; 342 } else if (sumMultiplier <= 0) { 343 return "sumMultiplier = " + sumMultiplier + " <= 0"; 344 } else if ((total / sumMultiplier) < minCostNeedBalance) { 345 return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " 346 + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")"; 347 } else { 348 return ""; 349 } 350 } 351 352 @RestrictedApi(explanation = "Should only be called in tests", link = "", 353 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 354 boolean needsBalance(TableName tableName, BalancerClusterState cluster) { 355 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 356 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 357 LOG.info( 358 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 359 sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers() 360 + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null); 361 return false; 362 } 363 if (areSomeRegionReplicasColocated(cluster)) { 364 LOG.info("Running balancer because at least one server hosts replicas of the same region." 365 + " function cost={}", functionCost()); 366 return true; 367 } 368 369 if (idleRegionServerExist(cluster)) { 370 LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", 371 functionCost()); 372 return true; 373 } 374 375 if (sloppyRegionServerExist(cs)) { 376 LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", 377 functionCost()); 378 return true; 379 } 380 381 double total = 0.0; 382 for (CostFunction c : costFunctions) { 383 if (!c.isNeeded()) { 384 LOG.trace("{} not needed", c.getClass().getSimpleName()); 385 continue; 386 } 387 total += c.cost() * c.getMultiplier(); 388 } 389 boolean balanced = (total / sumMultiplier < minCostNeedBalance); 390 391 if (balanced) { 392 final double calculatedTotal = total; 393 sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier), 394 costFunctions); 395 LOG.info( 396 "{} - skipping load balancing because weighted average imbalance={} <= " 397 + "threshold({}). If you want more aggressive balancing, either lower " 398 + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " 399 + "multiplier(s) of the specific cost function(s). functionCost={}", 400 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, 401 minCostNeedBalance, minCostNeedBalance, functionCost()); 402 } else { 403 LOG.info("{} - Calculating plan. may take up to {}ms to complete.", 404 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); 405 } 406 return !balanced; 407 } 408 409 @RestrictedApi(explanation = "Should only be called in tests", link = "", 410 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 411 BalanceAction nextAction(BalancerClusterState cluster) { 412 return getRandomGenerator().generate(cluster); 413 } 414 415 /** 416 * Select the candidate generator to use based on the cost of cost functions. The chance of 417 * selecting a candidate generator is propotional to the share of cost of all cost functions among 418 * all cost functions that benefit from it. 419 */ 420 protected CandidateGenerator getRandomGenerator() { 421 double sum = 0; 422 for (int i = 0; i < weightsOfGenerators.length; i++) { 423 sum += weightsOfGenerators[i]; 424 weightsOfGenerators[i] = sum; 425 } 426 if (sum == 0) { 427 return candidateGenerators.get(0); 428 } 429 for (int i = 0; i < weightsOfGenerators.length; i++) { 430 weightsOfGenerators[i] /= sum; 431 } 432 double rand = ThreadLocalRandom.current().nextDouble(); 433 for (int i = 0; i < weightsOfGenerators.length; i++) { 434 if (rand <= weightsOfGenerators[i]) { 435 return candidateGenerators.get(i); 436 } 437 } 438 return candidateGenerators.get(candidateGenerators.size() - 1); 439 } 440 441 @RestrictedApi(explanation = "Should only be called in tests", link = "", 442 allowedOnPath = ".*/src/test/.*") 443 void setRackManager(RackManager rackManager) { 444 this.rackManager = rackManager; 445 } 446 447 private long calculateMaxSteps(BalancerClusterState cluster) { 448 return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers; 449 } 450 451 /** 452 * Given the cluster state this will try and approach an optimal balance. This should always 453 * approach the optimal state given enough steps. 454 */ 455 @Override 456 protected List<RegionPlan> balanceTable(TableName tableName, 457 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 458 // On clusters with lots of HFileLinks or lots of reference files, 459 // instantiating the storefile infos can be quite expensive. 460 // Allow turning this feature off if the locality cost is not going to 461 // be used in any computations. 462 RegionHDFSBlockLocationFinder finder = null; 463 if ((this.localityCost != null) || (this.rackLocalityCost != null)) { 464 finder = this.regionFinder; 465 } 466 467 // The clusterState that is given to this method contains the state 468 // of all the regions in the table(s) (that's true today) 469 // Keep track of servers to iterate through them. 470 BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, 471 rackManager, regionCacheRatioOnOldServerMap); 472 473 long startTime = EnvironmentEdgeManager.currentTime(); 474 475 initCosts(cluster); 476 477 sumMultiplier = 0; 478 for (CostFunction c : costFunctions) { 479 if (c.isNeeded()) { 480 sumMultiplier += c.getMultiplier(); 481 } 482 } 483 if (sumMultiplier <= 0) { 484 LOG.error("At least one cost function needs a multiplier > 0. For example, set " 485 + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default"); 486 return null; 487 } 488 489 double currentCost = computeCost(cluster, Double.MAX_VALUE); 490 curOverallCost = currentCost; 491 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 492 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 493 double initCost = currentCost; 494 double newCost; 495 496 if (!needsBalance(tableName, cluster)) { 497 return null; 498 } 499 500 long computedMaxSteps; 501 if (runMaxSteps) { 502 computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster)); 503 } else { 504 long calculatedMaxSteps = calculateMaxSteps(cluster); 505 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 506 if (calculatedMaxSteps > maxSteps) { 507 LOG.warn( 508 "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 509 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 510 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 511 + "(This config change does not require service restart)", 512 calculatedMaxSteps, maxSteps); 513 } 514 } 515 LOG.info( 516 "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " 517 + "functionCost={} computedMaxSteps={}", 518 currentCost / sumMultiplier, functionCost(), computedMaxSteps); 519 520 final String initFunctionTotalCosts = totalCostsPerFunc(); 521 // Perform a stochastic walk to see if we can get a good fit. 522 long step; 523 524 for (step = 0; step < computedMaxSteps; step++) { 525 BalanceAction action = nextAction(cluster); 526 527 if (action.getType() == BalanceAction.Type.NULL) { 528 continue; 529 } 530 531 cluster.doAction(action); 532 updateCostsAndWeightsWithAction(cluster, action); 533 534 newCost = computeCost(cluster, currentCost); 535 536 // Should this be kept? 537 if (newCost < currentCost) { 538 currentCost = newCost; 539 540 // save for JMX 541 curOverallCost = currentCost; 542 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 543 } else { 544 // Put things back the way they were before. 545 // TODO: undo by remembering old values 546 BalanceAction undoAction = action.undoAction(); 547 cluster.doAction(undoAction); 548 updateCostsAndWeightsWithAction(cluster, undoAction); 549 } 550 551 if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { 552 break; 553 } 554 } 555 long endTime = EnvironmentEdgeManager.currentTime(); 556 557 metricsBalancer.balanceCluster(endTime - startTime); 558 559 if (initCost > currentCost) { 560 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 561 List<RegionPlan> plans = createRegionPlans(cluster); 562 LOG.info( 563 "Finished computing new moving plan. Computation took {} ms" 564 + " to try {} different iterations. Found a solution that moves " 565 + "{} regions; Going from a computed imbalance of {}" 566 + " to a new imbalance of {}. funtionCost={}", 567 endTime - startTime, step, plans.size(), initCost / sumMultiplier, 568 currentCost / sumMultiplier, functionCost()); 569 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 570 return plans; 571 } 572 LOG.info( 573 "Could not find a better moving plan. Tried {} different configurations in " 574 + "{} ms, and did not find anything with an imbalance score less than {}", 575 step, endTime - startTime, initCost / sumMultiplier); 576 return null; 577 } 578 579 protected void sendRejectionReasonToRingBuffer(Supplier<String> reason, 580 List<CostFunction> costFunctions) { 581 provider.recordBalancerRejection(() -> { 582 BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get()); 583 if (costFunctions != null) { 584 for (CostFunction c : costFunctions) { 585 if (!c.isNeeded()) { 586 continue; 587 } 588 builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier()); 589 } 590 } 591 return builder.build(); 592 }); 593 } 594 595 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 596 double initCost, String initFunctionTotalCosts, long step) { 597 provider.recordBalancerDecision(() -> { 598 List<String> regionPlans = new ArrayList<>(); 599 for (RegionPlan plan : plans) { 600 regionPlans 601 .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 602 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 603 } 604 return new BalancerDecision.Builder().setInitTotalCost(initCost) 605 .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) 606 .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) 607 .setRegionPlans(regionPlans).build(); 608 }); 609 } 610 611 /** 612 * update costs to JMX 613 */ 614 private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { 615 if (tableName == null) { 616 return; 617 } 618 619 // check if the metricsBalancer is MetricsStochasticBalancer before casting 620 if (metricsBalancer instanceof MetricsStochasticBalancer) { 621 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 622 // overall cost 623 balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME, 624 "Overall cost", overall); 625 626 // each cost function 627 for (int i = 0; i < costFunctions.size(); i++) { 628 CostFunction costFunction = costFunctions.get(i); 629 String costFunctionName = costFunction.getClass().getSimpleName(); 630 double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 631 // TODO: cost function may need a specific description 632 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 633 "The percent of " + costFunctionName, costPercent); 634 } 635 } 636 } 637 638 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 639 float multiplier = costFunction.getMultiplier(); 640 if (multiplier > 0) { 641 costFunctions.add(costFunction); 642 } 643 } 644 645 protected String functionCost() { 646 StringBuilder builder = new StringBuilder(); 647 for (CostFunction c : costFunctions) { 648 builder.append(c.getClass().getSimpleName()); 649 builder.append(" : ("); 650 if (c.isNeeded()) { 651 builder.append("multiplier=" + c.getMultiplier()); 652 builder.append(", "); 653 double cost = c.cost(); 654 builder.append("imbalance=" + cost); 655 if (cost >= minCostNeedBalance) { 656 builder.append(", need balance"); 657 } 658 } else { 659 builder.append("not needed"); 660 } 661 builder.append("); "); 662 } 663 return builder.toString(); 664 } 665 666 @RestrictedApi(explanation = "Should only be called in tests", link = "", 667 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 668 List<CostFunction> getCostFunctions() { 669 return costFunctions; 670 } 671 672 private String totalCostsPerFunc() { 673 StringBuilder builder = new StringBuilder(); 674 for (CostFunction c : costFunctions) { 675 if (!c.isNeeded()) { 676 continue; 677 } 678 double cost = c.getMultiplier() * c.cost(); 679 if (cost > 0.0) { 680 builder.append(" "); 681 builder.append(c.getClass().getSimpleName()); 682 builder.append(" : "); 683 builder.append(cost); 684 builder.append(";"); 685 } 686 } 687 if (builder.length() > 0) { 688 builder.deleteCharAt(builder.length() - 1); 689 } 690 return builder.toString(); 691 } 692 693 /** 694 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 695 * state. 696 * @param cluster The state of the cluster 697 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 698 */ 699 private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) { 700 List<RegionPlan> plans = new ArrayList<>(); 701 for (int regionIndex = 0; regionIndex 702 < cluster.regionIndexToServerIndex.length; regionIndex++) { 703 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 704 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 705 706 if (initialServerIndex != newServerIndex) { 707 RegionInfo region = cluster.regions[regionIndex]; 708 ServerName initialServer = cluster.servers[initialServerIndex]; 709 ServerName newServer = cluster.servers[newServerIndex]; 710 711 if (LOG.isTraceEnabled()) { 712 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 713 + initialServer.getHostname() + " to " + newServer.getHostname()); 714 } 715 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 716 plans.add(rp); 717 } 718 } 719 return plans; 720 } 721 722 /** 723 * Store the current region loads. 724 */ 725 private void updateRegionLoad() { 726 // We create a new hashmap so that regions that are no longer there are removed. 727 // However we temporarily need the old loads so we can use them to keep the rolling average. 728 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 729 loads = new HashMap<>(); 730 731 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 732 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 733 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 734 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 735 if (rLoads == null) { 736 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 737 } else if (rLoads.size() >= numRegionLoadsToRemember) { 738 rLoads.remove(); 739 } 740 rLoads.add(new BalancerRegionLoad(rm)); 741 loads.put(regionNameAsString, rLoads); 742 }); 743 }); 744 } 745 746 @RestrictedApi(explanation = "Should only be called in tests", link = "", 747 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 748 void initCosts(BalancerClusterState cluster) { 749 // Initialize the weights of generator every time 750 weightsOfGenerators = new double[this.candidateGenerators.size()]; 751 for (CostFunction c : costFunctions) { 752 c.prepare(cluster); 753 c.updateWeight(weightsOfGenerators); 754 } 755 } 756 757 /** 758 * Update both the costs of costfunctions and the weights of candidate generators 759 */ 760 @RestrictedApi(explanation = "Should only be called in tests", link = "", 761 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 762 void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { 763 // Reset all the weights to 0 764 for (int i = 0; i < weightsOfGenerators.length; i++) { 765 weightsOfGenerators[i] = 0; 766 } 767 for (CostFunction c : costFunctions) { 768 if (c.isNeeded()) { 769 c.postAction(action); 770 c.updateWeight(weightsOfGenerators); 771 } 772 } 773 } 774 775 /** 776 * Get the names of the cost functions 777 */ 778 @RestrictedApi(explanation = "Should only be called in tests", link = "", 779 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 780 String[] getCostFunctionNames() { 781 String[] ret = new String[costFunctions.size()]; 782 for (int i = 0; i < costFunctions.size(); i++) { 783 CostFunction c = costFunctions.get(i); 784 ret[i] = c.getClass().getSimpleName(); 785 } 786 787 return ret; 788 } 789 790 /** 791 * This is the main cost function. It will compute a cost associated with a proposed cluster 792 * state. All different costs will be combined with their multipliers to produce a double cost. 793 * @param cluster The state of the cluster 794 * @param previousCost the previous cost. This is used as an early out. 795 * @return a double of a cost associated with the proposed cluster state. This cost is an 796 * aggregate of all individual cost functions. 797 */ 798 @RestrictedApi(explanation = "Should only be called in tests", link = "", 799 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 800 double computeCost(BalancerClusterState cluster, double previousCost) { 801 double total = 0; 802 803 for (int i = 0; i < costFunctions.size(); i++) { 804 CostFunction c = costFunctions.get(i); 805 this.tempFunctionCosts[i] = 0.0; 806 807 if (!c.isNeeded()) { 808 continue; 809 } 810 811 Float multiplier = c.getMultiplier(); 812 double cost = c.cost(); 813 814 this.tempFunctionCosts[i] = multiplier * cost; 815 total += this.tempFunctionCosts[i]; 816 817 if (total > previousCost) { 818 break; 819 } 820 } 821 822 return total; 823 } 824 825 /** 826 * A helper function to compose the attribute name from tablename and costfunction name 827 */ 828 static String composeAttributeName(String tableName, String costFunctionName) { 829 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 830 } 831}