001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.lang.reflect.Constructor; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ThreadLocalRandom; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.RegionMetrics; 035import org.apache.hadoop.hbase.ServerMetrics; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.BalancerDecision; 039import org.apache.hadoop.hbase.client.BalancerRejection; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.master.RackManager; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 044import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; 045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * <p> 054 * This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and 055 * mutate the cluster to Cprime. If F(Cprime) < F(C) then the new cluster state becomes the plan. 056 * It includes costs functions to compute the cost of: 057 * </p> 058 * <ul> 059 * <li>Region Load</li> 060 * <li>Table Load</li> 061 * <li>Data Locality</li> 062 * <li>Memstore Sizes</li> 063 * <li>Storefile Sizes</li> 064 * </ul> 065 * <p> 066 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best 067 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are 068 * scaled by their respective multipliers: 069 * </p> 070 * <ul> 071 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 072 * <li>hbase.master.balancer.stochastic.moveCost</li> 073 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 074 * <li>hbase.master.balancer.stochastic.localityCost</li> 075 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 076 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 077 * </ul> 078 * <p> 079 * You can also add custom Cost function by setting the the following configuration value: 080 * </p> 081 * <ul> 082 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 083 * </ul> 084 * <p> 085 * All custom Cost Functions needs to extends {@link CostFunction} 086 * </p> 087 * <p> 088 * In addition to the above configurations, the balancer can be tuned by the following configuration 089 * values: 090 * </p> 091 * <ul> 092 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions 093 * that can be moved in a single invocation of this balancer.</li> 094 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 095 * regions is multiplied to try and get the number of times the balancer will mutate all 096 * servers.</li> 097 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the 098 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and 099 * the above computation.</li> 100 * </ul> 101 * <p> 102 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the 103 * balancer gets the full picture of all loads on the cluster. 104 * </p> 105 */ 106@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 107public class StochasticLoadBalancer extends BaseLoadBalancer { 108 109 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 110 111 protected static final String STEPS_PER_REGION_KEY = 112 "hbase.master.balancer.stochastic.stepsPerRegion"; 113 protected static final int DEFAULT_STEPS_PER_REGION = 800; 114 protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; 115 protected static final int DEFAULT_MAX_STEPS = 1000000; 116 protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps"; 117 protected static final boolean DEFAULT_RUN_MAX_STEPS = false; 118 protected static final String MAX_RUNNING_TIME_KEY = 119 "hbase.master.balancer.stochastic.maxRunningTime"; 120 protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds. 121 protected static final String KEEP_REGION_LOADS = 122 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 123 protected static final int DEFAULT_KEEP_REGION_LOADS = 15; 124 private static final String TABLE_FUNCTION_SEP = "_"; 125 protected static final String MIN_COST_NEED_BALANCE_KEY = 126 "hbase.master.balancer.stochastic.minCostNeedBalance"; 127 protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f; 128 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 129 "hbase.master.balancer.stochastic.additionalCostFunctions"; 130 public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; 131 132 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 133 134 // values are defaults 135 private int maxSteps = DEFAULT_MAX_STEPS; 136 private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; 137 private int stepsPerRegion = DEFAULT_STEPS_PER_REGION; 138 private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; 139 private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; 140 private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; 141 private boolean isBalancerDecisionRecording = false; 142 private boolean isBalancerRejectionRecording = false; 143 144 protected List<CandidateGenerator> candidateGenerators; 145 146 public enum GeneratorType { 147 RANDOM, 148 LOAD, 149 LOCALITY, 150 RACK 151 } 152 153 private double[] weightsOfGenerators; 154 private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC 155 // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost 156 private float sumMultiplier; 157 // to save and report costs to JMX 158 private double curOverallCost = 0d; 159 private double[] tempFunctionCosts; 160 private double[] curFunctionCosts; 161 162 // Keep locality based picker and cost function to alert them 163 // when new services are offered 164 private LocalityBasedCandidateGenerator localityCandidateGenerator; 165 private ServerLocalityCostFunction localityCost; 166 private RackLocalityCostFunction rackLocalityCost; 167 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 168 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 169 170 /** 171 * Use to add balancer decision history to ring-buffer 172 */ 173 NamedQueueRecorder namedQueueRecorder; 174 175 /** 176 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 177 * default MetricsBalancer 178 */ 179 public StochasticLoadBalancer() { 180 super(new MetricsStochasticBalancer()); 181 } 182 183 @RestrictedApi(explanation = "Should only be called in tests", link = "", 184 allowedOnPath = ".*/src/test/.*") 185 public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) { 186 super(metricsStochasticBalancer); 187 } 188 189 private static CostFunction createCostFunction(Class<? extends CostFunction> clazz, 190 Configuration conf) { 191 try { 192 Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class); 193 return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); 194 } catch (NoSuchMethodException e) { 195 // will try construct with no parameter 196 } 197 return ReflectionUtils.newInstance(clazz); 198 } 199 200 private void loadCustomCostFunctions(Configuration conf) { 201 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 202 203 if (null == functionsNames) { 204 return; 205 } 206 for (String className : functionsNames) { 207 Class<? extends CostFunction> clazz; 208 try { 209 clazz = Class.forName(className).asSubclass(CostFunction.class); 210 } catch (ClassNotFoundException e) { 211 LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); 212 continue; 213 } 214 CostFunction func = createCostFunction(clazz, conf); 215 LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); 216 costFunctions.add(func); 217 } 218 } 219 220 @RestrictedApi(explanation = "Should only be called in tests", link = "", 221 allowedOnPath = ".*/src/test/.*") 222 List<CandidateGenerator> getCandidateGenerators() { 223 return this.candidateGenerators; 224 } 225 226 protected List<CandidateGenerator> createCandidateGenerators() { 227 List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4); 228 candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator()); 229 candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator()); 230 candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator); 231 candidateGenerators.add(GeneratorType.RACK.ordinal(), 232 new RegionReplicaRackCandidateGenerator()); 233 return candidateGenerators; 234 } 235 236 @Override 237 protected void loadConf(Configuration conf) { 238 super.loadConf(conf); 239 maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS); 240 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION); 241 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME); 242 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS); 243 244 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS); 245 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE); 246 localityCandidateGenerator = new LocalityBasedCandidateGenerator(); 247 localityCost = new ServerLocalityCostFunction(conf); 248 rackLocalityCost = new RackLocalityCostFunction(conf); 249 250 this.candidateGenerators = createCandidateGenerators(); 251 252 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 253 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 254 costFunctions = new ArrayList<>(); 255 addCostFunction(new RegionCountSkewCostFunction(conf)); 256 addCostFunction(new PrimaryRegionCountSkewCostFunction(conf)); 257 addCostFunction(new MoveCostFunction(conf)); 258 addCostFunction(localityCost); 259 addCostFunction(rackLocalityCost); 260 addCostFunction(new TableSkewCostFunction(conf)); 261 addCostFunction(regionReplicaHostCostFunction); 262 addCostFunction(regionReplicaRackCostFunction); 263 addCostFunction(new ReadRequestCostFunction(conf)); 264 addCostFunction(new WriteRequestCostFunction(conf)); 265 addCostFunction(new MemStoreSizeCostFunction(conf)); 266 addCostFunction(new StoreFileCostFunction(conf)); 267 loadCustomCostFunctions(conf); 268 269 curFunctionCosts = new double[costFunctions.size()]; 270 tempFunctionCosts = new double[costFunctions.size()]; 271 272 isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 273 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 274 isBalancerRejectionRecording = 275 conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, 276 BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); 277 278 if ( 279 this.namedQueueRecorder == null 280 && (isBalancerDecisionRecording || isBalancerRejectionRecording) 281 ) { 282 this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf); 283 } 284 285 LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps 286 + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" 287 + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) 288 + " , sum of multiplier of cost functions = " + sumMultiplier + " etc."); 289 } 290 291 @Override 292 public synchronized void updateClusterMetrics(ClusterMetrics st) { 293 super.updateClusterMetrics(st); 294 updateRegionLoad(); 295 296 // update metrics size 297 try { 298 // by-table or ensemble mode 299 int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1; 300 int functionsCount = getCostFunctionNames().length; 301 302 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 303 } catch (Exception e) { 304 LOG.error("failed to get the size of all tables", e); 305 } 306 } 307 308 private void updateBalancerTableLoadInfo(TableName tableName, 309 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 310 RegionLocationFinder finder = null; 311 if ( 312 (this.localityCost != null && this.localityCost.getMultiplier() > 0) 313 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) 314 ) { 315 finder = this.regionFinder; 316 } 317 BalancerClusterState cluster = 318 new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); 319 320 initCosts(cluster); 321 curOverallCost = computeCost(cluster, Double.MAX_VALUE); 322 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 323 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 324 } 325 326 @Override 327 public void 328 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 329 if (isByTable) { 330 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 331 updateBalancerTableLoadInfo(tableName, loadOfOneTable); 332 }); 333 } else { 334 updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, 335 toEnsumbleTableLoad(loadOfAllTable)); 336 } 337 } 338 339 /** 340 * Update the number of metrics that are reported to JMX 341 */ 342 @RestrictedApi(explanation = "Should only be called in tests", link = "", 343 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 344 void updateMetricsSize(int size) { 345 if (metricsBalancer instanceof MetricsStochasticBalancer) { 346 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 347 } 348 } 349 350 private boolean areSomeRegionReplicasColocated(BalancerClusterState c) { 351 regionReplicaHostCostFunction.prepare(c); 352 if (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON) { 353 return true; 354 } 355 return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON); 356 } 357 358 @RestrictedApi(explanation = "Should only be called in tests", link = "", 359 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 360 boolean needsBalance(TableName tableName, BalancerClusterState cluster) { 361 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 362 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 363 LOG.info( 364 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 365 sendRejectionReasonToRingBuffer("The number of RegionServers " + cs.getNumServers() 366 + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null); 367 return false; 368 } 369 if (areSomeRegionReplicasColocated(cluster)) { 370 LOG.info("Running balancer because at least one server hosts replicas of the same region." 371 + " function cost={}", functionCost()); 372 return true; 373 } 374 375 if (idleRegionServerExist(cluster)) { 376 LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", 377 functionCost()); 378 return true; 379 } 380 381 if (sloppyRegionServerExist(cs)) { 382 LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", 383 functionCost()); 384 return true; 385 } 386 387 double total = 0.0; 388 for (CostFunction c : costFunctions) { 389 if (!c.isNeeded()) { 390 LOG.trace("{} not needed", c.getClass().getSimpleName()); 391 continue; 392 } 393 total += c.cost() * c.getMultiplier(); 394 } 395 396 boolean balanced = (total / sumMultiplier < minCostNeedBalance); 397 if (balanced) { 398 if (isBalancerRejectionRecording) { 399 String reason = ""; 400 if (total <= 0) { 401 reason = 402 "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0"; 403 } else if (sumMultiplier <= 0) { 404 reason = "sumMultiplier = " + sumMultiplier + " <= 0"; 405 } else if ((total / sumMultiplier) < minCostNeedBalance) { 406 reason = 407 "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " 408 + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")"; 409 } 410 sendRejectionReasonToRingBuffer(reason, costFunctions); 411 } 412 LOG.info( 413 "{} - skipping load balancing because weighted average imbalance={} <= " 414 + "threshold({}). If you want more aggressive balancing, either lower " 415 + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " 416 + "multiplier(s) of the specific cost function(s). functionCost={}", 417 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, 418 minCostNeedBalance, minCostNeedBalance, functionCost()); 419 } else { 420 LOG.info("{} - Calculating plan. may take up to {}ms to complete.", 421 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); 422 } 423 return !balanced; 424 } 425 426 @RestrictedApi(explanation = "Should only be called in tests", link = "", 427 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 428 BalanceAction nextAction(BalancerClusterState cluster) { 429 return getRandomGenerator().generate(cluster); 430 } 431 432 /** 433 * Select the candidate generator to use based on the cost of cost functions. The chance of 434 * selecting a candidate generator is propotional to the share of cost of all cost functions among 435 * all cost functions that benefit from it. 436 */ 437 protected CandidateGenerator getRandomGenerator() { 438 double sum = 0; 439 for (int i = 0; i < weightsOfGenerators.length; i++) { 440 sum += weightsOfGenerators[i]; 441 weightsOfGenerators[i] = sum; 442 } 443 if (sum == 0) { 444 return candidateGenerators.get(0); 445 } 446 for (int i = 0; i < weightsOfGenerators.length; i++) { 447 weightsOfGenerators[i] /= sum; 448 } 449 double rand = ThreadLocalRandom.current().nextDouble(); 450 for (int i = 0; i < weightsOfGenerators.length; i++) { 451 if (rand <= weightsOfGenerators[i]) { 452 return candidateGenerators.get(i); 453 } 454 } 455 return candidateGenerators.get(candidateGenerators.size() - 1); 456 } 457 458 @RestrictedApi(explanation = "Should only be called in tests", link = "", 459 allowedOnPath = ".*/src/test/.*") 460 void setRackManager(RackManager rackManager) { 461 this.rackManager = rackManager; 462 } 463 464 private long calculateMaxSteps(BalancerClusterState cluster) { 465 return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers; 466 } 467 468 /** 469 * Given the cluster state this will try and approach an optimal balance. This should always 470 * approach the optimal state given enough steps. 471 */ 472 @Override 473 protected List<RegionPlan> balanceTable(TableName tableName, 474 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 475 List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable); 476 if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 477 return plans; 478 } 479 480 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 481 if (loadOfOneTable.size() <= 2) { 482 return null; 483 } 484 loadOfOneTable = new HashMap<>(loadOfOneTable); 485 loadOfOneTable.remove(masterServerName); 486 } 487 488 // On clusters with lots of HFileLinks or lots of reference files, 489 // instantiating the storefile infos can be quite expensive. 490 // Allow turning this feature off if the locality cost is not going to 491 // be used in any computations. 492 RegionLocationFinder finder = null; 493 if ( 494 (this.localityCost != null && this.localityCost.getMultiplier() > 0) 495 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) 496 ) { 497 finder = this.regionFinder; 498 } 499 500 // The clusterState that is given to this method contains the state 501 // of all the regions in the table(s) (that's true today) 502 // Keep track of servers to iterate through them. 503 BalancerClusterState cluster = 504 new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); 505 506 long startTime = EnvironmentEdgeManager.currentTime(); 507 508 initCosts(cluster); 509 sumMultiplier = 0; 510 for (CostFunction c : costFunctions) { 511 if (c.isNeeded()) { 512 sumMultiplier += c.getMultiplier(); 513 } 514 } 515 if (sumMultiplier <= 0) { 516 LOG.error("At least one cost function needs a multiplier > 0. For example, set " 517 + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default"); 518 return null; 519 } 520 521 double currentCost = computeCost(cluster, Double.MAX_VALUE); 522 curOverallCost = currentCost; 523 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 524 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 525 double initCost = currentCost; 526 double newCost; 527 528 if (!needsBalance(tableName, cluster)) { 529 return null; 530 } 531 532 long computedMaxSteps; 533 if (runMaxSteps) { 534 computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster)); 535 } else { 536 long calculatedMaxSteps = calculateMaxSteps(cluster); 537 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 538 if (calculatedMaxSteps > maxSteps) { 539 LOG.warn( 540 "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 541 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 542 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 543 + "(This config change does not require service restart)", 544 calculatedMaxSteps, maxSteps); 545 } 546 } 547 LOG.info( 548 "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " 549 + "functionCost={} computedMaxSteps={}", 550 currentCost / sumMultiplier, functionCost(), computedMaxSteps); 551 552 final String initFunctionTotalCosts = totalCostsPerFunc(); 553 // Perform a stochastic walk to see if we can get a good fit. 554 long step; 555 556 for (step = 0; step < computedMaxSteps; step++) { 557 BalanceAction action = nextAction(cluster); 558 559 if (action.getType() == BalanceAction.Type.NULL) { 560 continue; 561 } 562 563 cluster.doAction(action); 564 updateCostsAndWeightsWithAction(cluster, action); 565 566 newCost = computeCost(cluster, currentCost); 567 568 // Should this be kept? 569 if (newCost < currentCost) { 570 currentCost = newCost; 571 572 // save for JMX 573 curOverallCost = currentCost; 574 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 575 } else { 576 // Put things back the way they were before. 577 // TODO: undo by remembering old values 578 BalanceAction undoAction = action.undoAction(); 579 cluster.doAction(undoAction); 580 updateCostsAndWeightsWithAction(cluster, undoAction); 581 } 582 583 if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { 584 break; 585 } 586 } 587 long endTime = EnvironmentEdgeManager.currentTime(); 588 589 metricsBalancer.balanceCluster(endTime - startTime); 590 591 if (initCost > currentCost) { 592 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 593 plans = createRegionPlans(cluster); 594 LOG.info( 595 "Finished computing new moving plan. Computation took {} ms" 596 + " to try {} different iterations. Found a solution that moves " 597 + "{} regions; Going from a computed imbalance of {}" 598 + " to a new imbalance of {}. funtionCost={}", 599 endTime - startTime, step, plans.size(), initCost / sumMultiplier, 600 currentCost / sumMultiplier, functionCost()); 601 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 602 return plans; 603 } 604 LOG.info( 605 "Could not find a better moving plan. Tried {} different configurations in " 606 + "{} ms, and did not find anything with an imbalance score less than {}", 607 step, endTime - startTime, initCost / sumMultiplier); 608 return null; 609 } 610 611 private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions) { 612 if (this.isBalancerRejectionRecording) { 613 BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason); 614 if (costFunctions != null) { 615 for (CostFunction c : costFunctions) { 616 if (!c.isNeeded()) { 617 continue; 618 } 619 builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier()); 620 } 621 } 622 namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build())); 623 } 624 } 625 626 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 627 double initCost, String initFunctionTotalCosts, long step) { 628 if (this.isBalancerDecisionRecording) { 629 List<String> regionPlans = new ArrayList<>(); 630 for (RegionPlan plan : plans) { 631 regionPlans 632 .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 633 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 634 } 635 BalancerDecision balancerDecision = new BalancerDecision.Builder().setInitTotalCost(initCost) 636 .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) 637 .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) 638 .setRegionPlans(regionPlans).build(); 639 namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision)); 640 } 641 } 642 643 /** 644 * update costs to JMX 645 */ 646 private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { 647 if (tableName == null) { 648 return; 649 } 650 651 // check if the metricsBalancer is MetricsStochasticBalancer before casting 652 if (metricsBalancer instanceof MetricsStochasticBalancer) { 653 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 654 // overall cost 655 balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME, 656 "Overall cost", overall); 657 658 // each cost function 659 for (int i = 0; i < costFunctions.size(); i++) { 660 CostFunction costFunction = costFunctions.get(i); 661 String costFunctionName = costFunction.getClass().getSimpleName(); 662 double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 663 // TODO: cost function may need a specific description 664 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 665 "The percent of " + costFunctionName, costPercent); 666 } 667 } 668 } 669 670 private void addCostFunction(CostFunction costFunction) { 671 float multiplier = costFunction.getMultiplier(); 672 if (multiplier > 0) { 673 costFunctions.add(costFunction); 674 } 675 } 676 677 private String functionCost() { 678 StringBuilder builder = new StringBuilder(); 679 for (CostFunction c : costFunctions) { 680 builder.append(c.getClass().getSimpleName()); 681 builder.append(" : ("); 682 if (c.isNeeded()) { 683 builder.append("multiplier=" + c.getMultiplier()); 684 builder.append(", "); 685 double cost = c.cost(); 686 builder.append("imbalance=" + cost); 687 if (cost >= minCostNeedBalance) { 688 builder.append(", need balance"); 689 } 690 } else { 691 builder.append("not needed"); 692 } 693 builder.append("); "); 694 } 695 return builder.toString(); 696 } 697 698 private String totalCostsPerFunc() { 699 StringBuilder builder = new StringBuilder(); 700 for (CostFunction c : costFunctions) { 701 if (!c.isNeeded()) { 702 continue; 703 } 704 double cost = c.getMultiplier() * c.cost(); 705 if (cost > 0.0) { 706 builder.append(" "); 707 builder.append(c.getClass().getSimpleName()); 708 builder.append(" : "); 709 builder.append(cost); 710 builder.append(";"); 711 } 712 } 713 if (builder.length() > 0) { 714 builder.deleteCharAt(builder.length() - 1); 715 } 716 return builder.toString(); 717 } 718 719 /** 720 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 721 * state. 722 * @param cluster The state of the cluster 723 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 724 */ 725 private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) { 726 List<RegionPlan> plans = new ArrayList<>(); 727 for (int regionIndex = 0; regionIndex 728 < cluster.regionIndexToServerIndex.length; regionIndex++) { 729 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 730 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 731 732 if (initialServerIndex != newServerIndex) { 733 RegionInfo region = cluster.regions[regionIndex]; 734 ServerName initialServer = cluster.servers[initialServerIndex]; 735 ServerName newServer = cluster.servers[newServerIndex]; 736 737 if (LOG.isTraceEnabled()) { 738 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 739 + initialServer.getHostname() + " to " + newServer.getHostname()); 740 } 741 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 742 plans.add(rp); 743 } 744 } 745 return plans; 746 } 747 748 /** 749 * Store the current region loads. 750 */ 751 private void updateRegionLoad() { 752 // We create a new hashmap so that regions that are no longer there are removed. 753 // However we temporarily need the old loads so we can use them to keep the rolling average. 754 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 755 loads = new HashMap<>(); 756 757 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 758 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 759 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 760 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 761 if (rLoads == null) { 762 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 763 } else if (rLoads.size() >= numRegionLoadsToRemember) { 764 rLoads.remove(); 765 } 766 rLoads.add(new BalancerRegionLoad(rm)); 767 loads.put(regionNameAsString, rLoads); 768 }); 769 }); 770 } 771 772 @RestrictedApi(explanation = "Should only be called in tests", link = "", 773 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 774 void initCosts(BalancerClusterState cluster) { 775 // Initialize the weights of generator every time 776 weightsOfGenerators = new double[this.candidateGenerators.size()]; 777 for (CostFunction c : costFunctions) { 778 c.prepare(cluster); 779 c.updateWeight(weightsOfGenerators); 780 } 781 } 782 783 @RestrictedApi(explanation = "Should only be called in tests", link = "", 784 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 785 void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { 786 // Reset all the weights to 0 787 for (int i = 0; i < weightsOfGenerators.length; i++) { 788 weightsOfGenerators[i] = 0; 789 } 790 for (CostFunction c : costFunctions) { 791 if (c.isNeeded()) { 792 c.postAction(action); 793 c.updateWeight(weightsOfGenerators); 794 } 795 } 796 } 797 798 /** 799 * Get the names of the cost functions 800 */ 801 @RestrictedApi(explanation = "Should only be called in tests", link = "", 802 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 803 String[] getCostFunctionNames() { 804 String[] ret = new String[costFunctions.size()]; 805 for (int i = 0; i < costFunctions.size(); i++) { 806 CostFunction c = costFunctions.get(i); 807 ret[i] = c.getClass().getSimpleName(); 808 } 809 810 return ret; 811 } 812 813 /** 814 * This is the main cost function. It will compute a cost associated with a proposed cluster 815 * state. All different costs will be combined with their multipliers to produce a double cost. 816 * @param cluster The state of the cluster 817 * @param previousCost the previous cost. This is used as an early out. 818 * @return a double of a cost associated with the proposed cluster state. This cost is an 819 * aggregate of all individual cost functions. 820 */ 821 @RestrictedApi(explanation = "Should only be called in tests", link = "", 822 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 823 double computeCost(BalancerClusterState cluster, double previousCost) { 824 double total = 0; 825 826 for (int i = 0; i < costFunctions.size(); i++) { 827 CostFunction c = costFunctions.get(i); 828 this.tempFunctionCosts[i] = 0.0; 829 830 if (!c.isNeeded()) { 831 continue; 832 } 833 834 Float multiplier = c.getMultiplier(); 835 double cost = c.cost(); 836 837 this.tempFunctionCosts[i] = multiplier * cost; 838 total += this.tempFunctionCosts[i]; 839 840 if (total > previousCost) { 841 break; 842 } 843 } 844 845 return total; 846 } 847 848 /** 849 * A helper function to compose the attribute name from tablename and costfunction name 850 */ 851 static String composeAttributeName(String tableName, String costFunctionName) { 852 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 853 } 854}