001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayDeque; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Deque; 025import java.util.HashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Objects; 030import java.util.Random; 031import java.util.stream.Collectors; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ClusterMetrics; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.RegionMetrics; 037import org.apache.hadoop.hbase.ServerMetrics; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; 044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; 046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType; 047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; 048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.ReflectionUtils; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057 058 059/** 060 * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will 061 * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the 062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p> 063 * <ul> 064 * <li>Region Load</li> 065 * <li>Table Load</li> 066 * <li>Data Locality</li> 067 * <li>Memstore Sizes</li> 068 * <li>Storefile Sizes</li> 069 * </ul> 070 * 071 * 072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost 073 * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are 074 * scaled by their respective multipliers:</p> 075 * 076 * <ul> 077 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 078 * <li>hbase.master.balancer.stochastic.moveCost</li> 079 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 080 * <li>hbase.master.balancer.stochastic.localityCost</li> 081 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 082 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 083 * </ul> 084 * 085 * <p>You can also add custom Cost function by setting the the following configuration value:</p> 086 * <ul> 087 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 088 * </ul> 089 * 090 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p> 091 * 092 * <p>In addition to the above configurations, the balancer can be tuned by the following 093 * configuration values:</p> 094 * <ul> 095 * <li>hbase.master.balancer.stochastic.maxMoveRegions which 096 * controls what the max number of regions that can be moved in a single invocation of this 097 * balancer.</li> 098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 099 * regions is multiplied to try and get the number of times the balancer will 100 * mutate all servers.</li> 101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that 102 * the balancer will try and mutate all the servers. The balancer will use the minimum of this 103 * value and the above computation.</li> 104 * </ul> 105 * 106 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false 107 * so that the balancer gets the full picture of all loads on the cluster.</p> 108 */ 109@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 110@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 111 justification="Complaint is about costFunctions not being synchronized; not end of the world") 112public class StochasticLoadBalancer extends BaseLoadBalancer { 113 114 protected static final String STEPS_PER_REGION_KEY = 115 "hbase.master.balancer.stochastic.stepsPerRegion"; 116 protected static final String MAX_STEPS_KEY = 117 "hbase.master.balancer.stochastic.maxSteps"; 118 protected static final String RUN_MAX_STEPS_KEY = 119 "hbase.master.balancer.stochastic.runMaxSteps"; 120 protected static final String MAX_RUNNING_TIME_KEY = 121 "hbase.master.balancer.stochastic.maxRunningTime"; 122 protected static final String KEEP_REGION_LOADS = 123 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 124 private static final String TABLE_FUNCTION_SEP = "_"; 125 protected static final String MIN_COST_NEED_BALANCE_KEY = 126 "hbase.master.balancer.stochastic.minCostNeedBalance"; 127 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 128 "hbase.master.balancer.stochastic.additionalCostFunctions"; 129 130 protected static final Random RANDOM = new Random(System.currentTimeMillis()); 131 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 132 133 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 134 135 // values are defaults 136 private int maxSteps = 1000000; 137 private boolean runMaxSteps = false; 138 private int stepsPerRegion = 800; 139 private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. 140 private int numRegionLoadsToRemember = 15; 141 private float minCostNeedBalance = 0.05f; 142 143 private List<CandidateGenerator> candidateGenerators; 144 private CostFromRegionLoadFunction[] regionLoadFunctions; 145 private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC 146 147 // to save and report costs to JMX 148 private Double curOverallCost = 0d; 149 private Double[] tempFunctionCosts; 150 private Double[] curFunctionCosts; 151 152 // Keep locality based picker and cost function to alert them 153 // when new services are offered 154 private LocalityBasedCandidateGenerator localityCandidateGenerator; 155 private ServerLocalityCostFunction localityCost; 156 private RackLocalityCostFunction rackLocalityCost; 157 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 158 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 159 160 /** 161 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 162 * default MetricsBalancer 163 */ 164 public StochasticLoadBalancer() { 165 super(new MetricsStochasticBalancer()); 166 } 167 168 @Override 169 public void onConfigurationChange(Configuration conf) { 170 setConf(conf); 171 } 172 173 @Override 174 public synchronized void setConf(Configuration conf) { 175 super.setConf(conf); 176 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); 177 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); 178 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); 179 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps); 180 181 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); 182 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance); 183 if (localityCandidateGenerator == null) { 184 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); 185 } 186 localityCost = new ServerLocalityCostFunction(conf, services); 187 rackLocalityCost = new RackLocalityCostFunction(conf, services); 188 189 if (this.candidateGenerators == null) { 190 candidateGenerators = Lists.newArrayList(); 191 candidateGenerators.add(new RandomCandidateGenerator()); 192 candidateGenerators.add(new LoadCandidateGenerator()); 193 candidateGenerators.add(localityCandidateGenerator); 194 candidateGenerators.add(new RegionReplicaRackCandidateGenerator()); 195 } 196 regionLoadFunctions = new CostFromRegionLoadFunction[] { 197 new ReadRequestCostFunction(conf), 198 new WriteRequestCostFunction(conf), 199 new MemStoreSizeCostFunction(conf), 200 new StoreFileCostFunction(conf) 201 }; 202 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 203 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 204 205 costFunctions = new ArrayList<>(); 206 costFunctions.add(new RegionCountSkewCostFunction(conf)); 207 costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf)); 208 costFunctions.add(new MoveCostFunction(conf)); 209 costFunctions.add(localityCost); 210 costFunctions.add(rackLocalityCost); 211 costFunctions.add(new TableSkewCostFunction(conf)); 212 costFunctions.add(regionReplicaHostCostFunction); 213 costFunctions.add(regionReplicaRackCostFunction); 214 costFunctions.add(regionLoadFunctions[0]); 215 costFunctions.add(regionLoadFunctions[1]); 216 costFunctions.add(regionLoadFunctions[2]); 217 costFunctions.add(regionLoadFunctions[3]); 218 loadCustomCostFunctions(conf); 219 220 curFunctionCosts = new Double[costFunctions.size()]; 221 tempFunctionCosts = new Double[costFunctions.size()]; 222 223 LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion + 224 ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" + 225 Arrays.toString(getCostFunctionNames()) + " etc."); 226 } 227 228 private void loadCustomCostFunctions(Configuration conf) { 229 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 230 231 if (null == functionsNames) { 232 return; 233 } 234 235 costFunctions.addAll(Arrays.stream(functionsNames) 236 .map(c -> { 237 Class<? extends CostFunction> klass = null; 238 try { 239 klass = (Class<? extends CostFunction>) Class.forName(c); 240 } catch (ClassNotFoundException e) { 241 LOG.warn("Cannot load class " + c + "': " + e.getMessage()); 242 } 243 if (null == klass) { 244 return null; 245 } 246 247 CostFunction reflected = ReflectionUtils.newInstance(klass, conf); 248 LOG.info("Successfully loaded custom CostFunction '" + 249 reflected.getClass().getSimpleName() + "'"); 250 251 return reflected; 252 }) 253 .filter(Objects::nonNull) 254 .collect(Collectors.toList())); 255 } 256 257 protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) { 258 this.candidateGenerators = customCandidateGenerators; 259 } 260 261 @Override 262 protected void setSlop(Configuration conf) { 263 this.slop = conf.getFloat("hbase.regions.slop", 0.001F); 264 } 265 266 @Override 267 public synchronized void setClusterMetrics(ClusterMetrics st) { 268 super.setClusterMetrics(st); 269 updateRegionLoad(); 270 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 271 cost.setClusterMetrics(st); 272 } 273 274 // update metrics size 275 try { 276 // by-table or ensemble mode 277 int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1; 278 int functionsCount = getCostFunctionNames().length; 279 280 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 281 } catch (Exception e) { 282 LOG.error("failed to get the size of all tables", e); 283 } 284 } 285 286 /** 287 * Update the number of metrics that are reported to JMX 288 */ 289 public void updateMetricsSize(int size) { 290 if (metricsBalancer instanceof MetricsStochasticBalancer) { 291 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 292 } 293 } 294 295 @Override 296 public synchronized void setMasterServices(MasterServices masterServices) { 297 super.setMasterServices(masterServices); 298 this.localityCost.setServices(masterServices); 299 this.rackLocalityCost.setServices(masterServices); 300 this.localityCandidateGenerator.setServices(masterServices); 301 } 302 303 @Override 304 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { 305 regionReplicaHostCostFunction.init(c); 306 if (regionReplicaHostCostFunction.cost() > 0) return true; 307 regionReplicaRackCostFunction.init(c); 308 if (regionReplicaRackCostFunction.cost() > 0) return true; 309 return false; 310 } 311 312 @Override 313 protected boolean needsBalance(TableName tableName, Cluster cluster) { 314 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 315 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug("Not running balancer because only " + cs.getNumServers() 318 + " active regionserver(s)"); 319 } 320 return false; 321 } 322 if (areSomeRegionReplicasColocated(cluster)) { 323 return true; 324 } 325 326 if (idleRegionServerExist(cluster)){ 327 return true; 328 } 329 330 double total = 0.0; 331 float sumMultiplier = 0.0f; 332 for (CostFunction c : costFunctions) { 333 float multiplier = c.getMultiplier(); 334 if (multiplier <= 0) { 335 LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName()); 336 continue; 337 } 338 if (!c.isNeeded()) { 339 LOG.trace("{} not needed", c.getClass().getSimpleName()); 340 continue; 341 } 342 sumMultiplier += multiplier; 343 total += c.cost() * multiplier; 344 } 345 346 boolean balanced = total <= 0 || sumMultiplier <= 0 || 347 (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance); 348 if (LOG.isDebugEnabled()) { 349 LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}", 350 balanced ? "Skipping load balancing because balanced" : "We need to load balance", 351 isByTable ? String.format("table (%s)", tableName) : "cluster", 352 total, sumMultiplier, minCostNeedBalance); 353 if (LOG.isTraceEnabled()) { 354 LOG.trace("Balance decision detailed function costs={}", functionCost()); 355 } 356 } 357 return !balanced; 358 } 359 360 @VisibleForTesting 361 Cluster.Action nextAction(Cluster cluster) { 362 return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size())) 363 .generate(cluster); 364 } 365 366 /** 367 * Given the cluster state this will try and approach an optimal balance. This 368 * should always approach the optimal state given enough steps. 369 */ 370 @Override 371 public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName, 372 List<RegionInfo>> loadOfOneTable) { 373 List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable); 374 if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 375 return plans; 376 } 377 378 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 379 if (loadOfOneTable.size() <= 2) { 380 return null; 381 } 382 loadOfOneTable = new HashMap<>(loadOfOneTable); 383 loadOfOneTable.remove(masterServerName); 384 } 385 386 // On clusters with lots of HFileLinks or lots of reference files, 387 // instantiating the storefile infos can be quite expensive. 388 // Allow turning this feature off if the locality cost is not going to 389 // be used in any computations. 390 RegionLocationFinder finder = null; 391 if ((this.localityCost != null && this.localityCost.getMultiplier() > 0) 392 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) { 393 finder = this.regionFinder; 394 } 395 396 //The clusterState that is given to this method contains the state 397 //of all the regions in the table(s) (that's true today) 398 // Keep track of servers to iterate through them. 399 Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager); 400 401 long startTime = EnvironmentEdgeManager.currentTime(); 402 403 initCosts(cluster); 404 405 if (!needsBalance(tableName, cluster)) { 406 return null; 407 } 408 409 double currentCost = computeCost(cluster, Double.MAX_VALUE); 410 curOverallCost = currentCost; 411 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 412 double initCost = currentCost; 413 double newCost = currentCost; 414 415 long computedMaxSteps; 416 if (runMaxSteps) { 417 computedMaxSteps = Math.max(this.maxSteps, 418 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); 419 } else { 420 long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion * 421 (long)cluster.numServers; 422 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 423 if (calculatedMaxSteps > maxSteps) { 424 LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 425 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 426 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 427 + "(This config change does not require service restart)", calculatedMaxSteps, 428 maxSteps); 429 } 430 } 431 LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost=" 432 + functionCost() + " computedMaxSteps: " + computedMaxSteps); 433 434 // Perform a stochastic walk to see if we can get a good fit. 435 long step; 436 437 for (step = 0; step < computedMaxSteps; step++) { 438 Cluster.Action action = nextAction(cluster); 439 440 if (action.type == Type.NULL) { 441 continue; 442 } 443 444 cluster.doAction(action); 445 updateCostsWithAction(cluster, action); 446 447 newCost = computeCost(cluster, currentCost); 448 449 // Should this be kept? 450 if (newCost < currentCost) { 451 currentCost = newCost; 452 453 // save for JMX 454 curOverallCost = currentCost; 455 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 456 } else { 457 // Put things back the way they were before. 458 // TODO: undo by remembering old values 459 Action undoAction = action.undoAction(); 460 cluster.doAction(undoAction); 461 updateCostsWithAction(cluster, undoAction); 462 } 463 464 if (EnvironmentEdgeManager.currentTime() - startTime > 465 maxRunningTime) { 466 break; 467 } 468 } 469 long endTime = EnvironmentEdgeManager.currentTime(); 470 471 metricsBalancer.balanceCluster(endTime - startTime); 472 473 // update costs metrics 474 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 475 if (initCost > currentCost) { 476 plans = createRegionPlans(cluster); 477 LOG.info("Finished computing new load balance plan. Computation took {}" + 478 " to try {} different iterations. Found a solution that moves " + 479 "{} regions; Going from a computed cost of {}" + 480 " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime), 481 step, plans.size(), initCost, currentCost); 482 return plans; 483 } 484 LOG.info("Could not find a better load balance plan. Tried {} different configurations in " + 485 "{}, and did not find anything with a computed cost less than {}", step, 486 java.time.Duration.ofMillis(endTime - startTime), initCost); 487 return null; 488 } 489 490 /** 491 * update costs to JMX 492 */ 493 private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) { 494 if (tableName == null) return; 495 496 // check if the metricsBalancer is MetricsStochasticBalancer before casting 497 if (metricsBalancer instanceof MetricsStochasticBalancer) { 498 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 499 // overall cost 500 balancer.updateStochasticCost(tableName.getNameAsString(), 501 "Overall", "Overall cost", overall); 502 503 // each cost function 504 for (int i = 0; i < costFunctions.size(); i++) { 505 CostFunction costFunction = costFunctions.get(i); 506 String costFunctionName = costFunction.getClass().getSimpleName(); 507 Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 508 // TODO: cost function may need a specific description 509 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 510 "The percent of " + costFunctionName, costPercent); 511 } 512 } 513 } 514 515 private String functionCost() { 516 StringBuilder builder = new StringBuilder(); 517 for (CostFunction c:costFunctions) { 518 builder.append(c.getClass().getSimpleName()); 519 builder.append(" : ("); 520 builder.append(c.getMultiplier()); 521 builder.append(", "); 522 builder.append(c.cost()); 523 builder.append("); "); 524 } 525 return builder.toString(); 526 } 527 528 /** 529 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 530 * state. 531 * 532 * @param cluster The state of the cluster 533 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 534 */ 535 private List<RegionPlan> createRegionPlans(Cluster cluster) { 536 List<RegionPlan> plans = new LinkedList<>(); 537 for (int regionIndex = 0; 538 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { 539 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 540 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 541 542 if (initialServerIndex != newServerIndex) { 543 RegionInfo region = cluster.regions[regionIndex]; 544 ServerName initialServer = cluster.servers[initialServerIndex]; 545 ServerName newServer = cluster.servers[newServerIndex]; 546 547 if (LOG.isTraceEnabled()) { 548 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 549 + initialServer.getHostname() + " to " + newServer.getHostname()); 550 } 551 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 552 plans.add(rp); 553 } 554 } 555 return plans; 556 } 557 558 /** 559 * Store the current region loads. 560 */ 561 private synchronized void updateRegionLoad() { 562 // We create a new hashmap so that regions that are no longer there are removed. 563 // However we temporarily need the old loads so we can use them to keep the rolling average. 564 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 565 loads = new HashMap<>(); 566 567 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 568 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 569 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 570 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 571 if (rLoads == null) { 572 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 573 } else if (rLoads.size() >= numRegionLoadsToRemember) { 574 rLoads.remove(); 575 } 576 rLoads.add(new BalancerRegionLoad(rm)); 577 loads.put(regionNameAsString, rLoads); 578 }); 579 }); 580 581 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 582 cost.setLoads(loads); 583 } 584 } 585 586 protected void initCosts(Cluster cluster) { 587 for (CostFunction c:costFunctions) { 588 c.init(cluster); 589 } 590 } 591 592 protected void updateCostsWithAction(Cluster cluster, Action action) { 593 for (CostFunction c : costFunctions) { 594 c.postAction(action); 595 } 596 } 597 598 /** 599 * Get the names of the cost functions 600 */ 601 public String[] getCostFunctionNames() { 602 if (costFunctions == null) return null; 603 String[] ret = new String[costFunctions.size()]; 604 for (int i = 0; i < costFunctions.size(); i++) { 605 CostFunction c = costFunctions.get(i); 606 ret[i] = c.getClass().getSimpleName(); 607 } 608 609 return ret; 610 } 611 612 /** 613 * This is the main cost function. It will compute a cost associated with a proposed cluster 614 * state. All different costs will be combined with their multipliers to produce a double cost. 615 * 616 * @param cluster The state of the cluster 617 * @param previousCost the previous cost. This is used as an early out. 618 * @return a double of a cost associated with the proposed cluster state. This cost is an 619 * aggregate of all individual cost functions. 620 */ 621 protected double computeCost(Cluster cluster, double previousCost) { 622 double total = 0; 623 624 for (int i = 0; i < costFunctions.size(); i++) { 625 CostFunction c = costFunctions.get(i); 626 this.tempFunctionCosts[i] = 0.0; 627 628 if (c.getMultiplier() <= 0) { 629 continue; 630 } 631 632 Float multiplier = c.getMultiplier(); 633 Double cost = c.cost(); 634 635 this.tempFunctionCosts[i] = multiplier*cost; 636 total += this.tempFunctionCosts[i]; 637 638 if (total > previousCost) { 639 break; 640 } 641 } 642 643 return total; 644 } 645 646 static class RandomCandidateGenerator extends CandidateGenerator { 647 648 @Override 649 Cluster.Action generate(Cluster cluster) { 650 651 int thisServer = pickRandomServer(cluster); 652 653 // Pick the other server 654 int otherServer = pickOtherRandomServer(cluster, thisServer); 655 656 return pickRandomRegions(cluster, thisServer, otherServer); 657 } 658 } 659 660 /** 661 * Generates candidates which moves the replicas out of the rack for 662 * co-hosted region replicas in the same rack 663 */ 664 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { 665 @Override 666 Cluster.Action generate(Cluster cluster) { 667 int rackIndex = pickRandomRack(cluster); 668 if (cluster.numRacks <= 1 || rackIndex == -1) { 669 return super.generate(cluster); 670 } 671 672 int regionIndex = selectCoHostedRegionPerGroup( 673 cluster.primariesOfRegionsPerRack[rackIndex], 674 cluster.regionsPerRack[rackIndex], 675 cluster.regionIndexToPrimaryIndex); 676 677 // if there are no pairs of region replicas co-hosted, default to random generator 678 if (regionIndex == -1) { 679 // default to randompicker 680 return randomGenerator.generate(cluster); 681 } 682 683 int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; 684 int toRackIndex = pickOtherRandomRack(cluster, rackIndex); 685 686 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); 687 int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; 688 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 689 return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); 690 } 691 } 692 693 /** 694 * Base class of StochasticLoadBalancer's Cost Functions. 695 */ 696 public abstract static class CostFunction { 697 698 private float multiplier = 0; 699 700 protected Cluster cluster; 701 702 public CostFunction(Configuration c) { 703 } 704 705 boolean isNeeded() { 706 return true; 707 } 708 float getMultiplier() { 709 return multiplier; 710 } 711 712 void setMultiplier(float m) { 713 this.multiplier = m; 714 } 715 716 /** Called once per LB invocation to give the cost function 717 * to initialize it's state, and perform any costly calculation. 718 */ 719 void init(Cluster cluster) { 720 this.cluster = cluster; 721 } 722 723 /** Called once per cluster Action to give the cost function 724 * an opportunity to update it's state. postAction() is always 725 * called at least once before cost() is called with the cluster 726 * that this action is performed on. */ 727 void postAction(Action action) { 728 switch (action.type) { 729 case NULL: break; 730 case ASSIGN_REGION: 731 AssignRegionAction ar = (AssignRegionAction) action; 732 regionMoved(ar.region, -1, ar.server); 733 break; 734 case MOVE_REGION: 735 MoveRegionAction mra = (MoveRegionAction) action; 736 regionMoved(mra.region, mra.fromServer, mra.toServer); 737 break; 738 case SWAP_REGIONS: 739 SwapRegionsAction a = (SwapRegionsAction) action; 740 regionMoved(a.fromRegion, a.fromServer, a.toServer); 741 regionMoved(a.toRegion, a.toServer, a.fromServer); 742 break; 743 default: 744 throw new RuntimeException("Uknown action:" + action.type); 745 } 746 } 747 748 protected void regionMoved(int region, int oldServer, int newServer) { 749 } 750 751 protected abstract double cost(); 752 753 @SuppressWarnings("checkstyle:linelength") 754 /** 755 * Function to compute a scaled cost using 756 * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}. 757 * It assumes that this is a zero sum set of costs. It assumes that the worst case 758 * possible is all of the elements in one region server and the rest having 0. 759 * 760 * @param stats the costs 761 * @return a scaled set of costs. 762 */ 763 protected double costFromArray(double[] stats) { 764 double totalCost = 0; 765 double total = getSum(stats); 766 767 double count = stats.length; 768 double mean = total/count; 769 770 // Compute max as if all region servers had 0 and one had the sum of all costs. This must be 771 // a zero sum cost for this to make sense. 772 double max = ((count - 1) * mean) + (total - mean); 773 774 // It's possible that there aren't enough regions to go around 775 double min; 776 if (count > total) { 777 min = ((count - total) * mean) + ((1 - mean) * total); 778 } else { 779 // Some will have 1 more than everything else. 780 int numHigh = (int) (total - (Math.floor(mean) * count)); 781 int numLow = (int) (count - numHigh); 782 783 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean))); 784 785 } 786 min = Math.max(0, min); 787 for (int i=0; i<stats.length; i++) { 788 double n = stats[i]; 789 double diff = Math.abs(mean - n); 790 totalCost += diff; 791 } 792 793 double scaled = scale(min, max, totalCost); 794 return scaled; 795 } 796 797 private double getSum(double[] stats) { 798 double total = 0; 799 for(double s:stats) { 800 total += s; 801 } 802 return total; 803 } 804 805 /** 806 * Scale the value between 0 and 1. 807 * 808 * @param min Min value 809 * @param max The Max value 810 * @param value The value to be scaled. 811 * @return The scaled value. 812 */ 813 protected double scale(double min, double max, double value) { 814 if (max <= min || value <= min) { 815 return 0; 816 } 817 if ((max - min) == 0) return 0; 818 819 return Math.max(0d, Math.min(1d, (value - min) / (max - min))); 820 } 821 } 822 823 /** 824 * Given the starting state of the regions and a potential ending state 825 * compute cost based upon the number of regions that have moved. 826 */ 827 static class MoveCostFunction extends CostFunction { 828 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; 829 private static final String MAX_MOVES_PERCENT_KEY = 830 "hbase.master.balancer.stochastic.maxMovePercent"; 831 private static final float DEFAULT_MOVE_COST = 7; 832 private static final int DEFAULT_MAX_MOVES = 600; 833 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f; 834 835 private final float maxMovesPercent; 836 837 MoveCostFunction(Configuration conf) { 838 super(conf); 839 840 // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure 841 // that large benefits are need to overcome the cost of a move. 842 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST)); 843 // What percent of the number of regions a single run of the balancer can move. 844 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT); 845 } 846 847 @Override 848 protected double cost() { 849 // Try and size the max number of Moves, but always be prepared to move some. 850 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), 851 DEFAULT_MAX_MOVES); 852 853 double moveCost = cluster.numMovedRegions; 854 855 // Don't let this single balance move more than the max moves. 856 // This allows better scaling to accurately represent the actual cost of a move. 857 if (moveCost > maxMoves) { 858 return 1000000; // return a number much greater than any of the other cost 859 } 860 861 return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost); 862 } 863 } 864 865 /** 866 * Compute the cost of a potential cluster state from skew in number of 867 * regions on a cluster. 868 */ 869 static class RegionCountSkewCostFunction extends CostFunction { 870 static final String REGION_COUNT_SKEW_COST_KEY = 871 "hbase.master.balancer.stochastic.regionCountCost"; 872 static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; 873 874 private double[] stats = null; 875 876 RegionCountSkewCostFunction(Configuration conf) { 877 super(conf); 878 // Load multiplier should be the greatest as it is the most general way to balance data. 879 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 880 } 881 882 @Override 883 void init(Cluster cluster) { 884 super.init(cluster); 885 LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(), 886 cluster.numServers, cluster.numRegions); 887 if (LOG.isTraceEnabled()) { 888 for (int i =0; i < cluster.numServers; i++) { 889 LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(), 890 cluster.servers[i], cluster.regionsPerServer[i].length); 891 } 892 } 893 } 894 895 @Override 896 protected double cost() { 897 if (stats == null || stats.length != cluster.numServers) { 898 stats = new double[cluster.numServers]; 899 } 900 for (int i =0; i < cluster.numServers; i++) { 901 stats[i] = cluster.regionsPerServer[i].length; 902 } 903 return costFromArray(stats); 904 } 905 } 906 907 /** 908 * Compute the cost of a potential cluster state from skew in number of 909 * primary regions on a cluster. 910 */ 911 static class PrimaryRegionCountSkewCostFunction extends CostFunction { 912 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY = 913 "hbase.master.balancer.stochastic.primaryRegionCountCost"; 914 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500; 915 916 private double[] stats = null; 917 918 PrimaryRegionCountSkewCostFunction(Configuration conf) { 919 super(conf); 920 // Load multiplier should be the greatest as primary regions serve majority of reads/writes. 921 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, 922 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST)); 923 } 924 925 @Override 926 boolean isNeeded() { 927 return cluster.hasRegionReplicas; 928 } 929 930 @Override 931 protected double cost() { 932 if (!cluster.hasRegionReplicas) { 933 return 0; 934 } 935 if (stats == null || stats.length != cluster.numServers) { 936 stats = new double[cluster.numServers]; 937 } 938 939 for (int i = 0; i < cluster.numServers; i++) { 940 stats[i] = 0; 941 for (int regionIdx : cluster.regionsPerServer[i]) { 942 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { 943 stats[i]++; 944 } 945 } 946 } 947 948 return costFromArray(stats); 949 } 950 } 951 952 /** 953 * Compute the cost of a potential cluster configuration based upon how evenly 954 * distributed tables are. 955 */ 956 static class TableSkewCostFunction extends CostFunction { 957 958 private static final String TABLE_SKEW_COST_KEY = 959 "hbase.master.balancer.stochastic.tableSkewCost"; 960 private static final float DEFAULT_TABLE_SKEW_COST = 35; 961 962 TableSkewCostFunction(Configuration conf) { 963 super(conf); 964 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); 965 } 966 967 @Override 968 protected double cost() { 969 double max = cluster.numRegions; 970 double min = ((double) cluster.numRegions) / cluster.numServers; 971 double value = 0; 972 973 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { 974 value += cluster.numMaxRegionsPerTable[i]; 975 } 976 977 return scale(min, max, value); 978 } 979 } 980 981 /** 982 * Compute a cost of a potential cluster configuration based upon where 983 * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located. 984 */ 985 static abstract class LocalityBasedCostFunction extends CostFunction { 986 987 private final LocalityType type; 988 989 private double bestLocality; // best case locality across cluster weighted by local data size 990 private double locality; // current locality across cluster weighted by local data size 991 992 private MasterServices services; 993 994 LocalityBasedCostFunction(Configuration conf, 995 MasterServices srv, 996 LocalityType type, 997 String localityCostKey, 998 float defaultLocalityCost) { 999 super(conf); 1000 this.type = type; 1001 this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost)); 1002 this.services = srv; 1003 this.locality = 0.0; 1004 this.bestLocality = 0.0; 1005 } 1006 1007 /** 1008 * Maps region to the current entity (server or rack) on which it is stored 1009 */ 1010 abstract int regionIndexToEntityIndex(int region); 1011 1012 public void setServices(MasterServices srvc) { 1013 this.services = srvc; 1014 } 1015 1016 @Override 1017 void init(Cluster cluster) { 1018 super.init(cluster); 1019 locality = 0.0; 1020 bestLocality = 0.0; 1021 1022 // If no master, no computation will work, so assume 0 cost 1023 if (this.services == null) { 1024 return; 1025 } 1026 1027 for (int region = 0; region < cluster.numRegions; region++) { 1028 locality += getWeightedLocality(region, regionIndexToEntityIndex(region)); 1029 bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region)); 1030 } 1031 1032 // We normalize locality to be a score between 0 and 1.0 representing how good it 1033 // is compared to how good it could be. If bestLocality is 0, assume locality is 100 1034 // (and the cost is 0) 1035 locality = bestLocality == 0 ? 1.0 : locality / bestLocality; 1036 } 1037 1038 @Override 1039 protected void regionMoved(int region, int oldServer, int newServer) { 1040 int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer]; 1041 int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer]; 1042 if (this.services == null) { 1043 return; 1044 } 1045 double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity); 1046 double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality; 1047 locality += normalizedDelta; 1048 } 1049 1050 @Override 1051 protected double cost() { 1052 return 1 - locality; 1053 } 1054 1055 private int getMostLocalEntityForRegion(int region) { 1056 return cluster.getOrComputeRegionsToMostLocalEntities(type)[region]; 1057 } 1058 1059 private double getWeightedLocality(int region, int entity) { 1060 return cluster.getOrComputeWeightedLocality(region, entity, type); 1061 } 1062 1063 } 1064 1065 static class ServerLocalityCostFunction extends LocalityBasedCostFunction { 1066 1067 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; 1068 private static final float DEFAULT_LOCALITY_COST = 25; 1069 1070 ServerLocalityCostFunction(Configuration conf, MasterServices srv) { 1071 super( 1072 conf, 1073 srv, 1074 LocalityType.SERVER, 1075 LOCALITY_COST_KEY, 1076 DEFAULT_LOCALITY_COST 1077 ); 1078 } 1079 1080 @Override 1081 int regionIndexToEntityIndex(int region) { 1082 return cluster.regionIndexToServerIndex[region]; 1083 } 1084 } 1085 1086 static class RackLocalityCostFunction extends LocalityBasedCostFunction { 1087 1088 private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost"; 1089 private static final float DEFAULT_RACK_LOCALITY_COST = 15; 1090 1091 public RackLocalityCostFunction(Configuration conf, MasterServices services) { 1092 super( 1093 conf, 1094 services, 1095 LocalityType.RACK, 1096 RACK_LOCALITY_COST_KEY, 1097 DEFAULT_RACK_LOCALITY_COST 1098 ); 1099 } 1100 1101 @Override 1102 int regionIndexToEntityIndex(int region) { 1103 return cluster.getRackForRegion(region); 1104 } 1105 } 1106 1107 /** 1108 * Base class the allows writing costs functions from rolling average of some 1109 * number from RegionLoad. 1110 */ 1111 abstract static class CostFromRegionLoadFunction extends CostFunction { 1112 1113 private ClusterMetrics clusterStatus = null; 1114 private Map<String, Deque<BalancerRegionLoad>> loads = null; 1115 private double[] stats = null; 1116 CostFromRegionLoadFunction(Configuration conf) { 1117 super(conf); 1118 } 1119 1120 void setClusterMetrics(ClusterMetrics status) { 1121 this.clusterStatus = status; 1122 } 1123 1124 void setLoads(Map<String, Deque<BalancerRegionLoad>> l) { 1125 this.loads = l; 1126 } 1127 1128 @Override 1129 protected double cost() { 1130 if (clusterStatus == null || loads == null) { 1131 return 0; 1132 } 1133 1134 if (stats == null || stats.length != cluster.numServers) { 1135 stats = new double[cluster.numServers]; 1136 } 1137 1138 for (int i =0; i < stats.length; i++) { 1139 //Cost this server has from RegionLoad 1140 long cost = 0; 1141 1142 // for every region on this server get the rl 1143 for(int regionIndex:cluster.regionsPerServer[i]) { 1144 Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex]; 1145 1146 // Now if we found a region load get the type of cost that was requested. 1147 if (regionLoadList != null) { 1148 cost = (long) (cost + getRegionLoadCost(regionLoadList)); 1149 } 1150 } 1151 1152 // Add the total cost to the stats. 1153 stats[i] = cost; 1154 } 1155 1156 // Now return the scaled cost from data held in the stats object. 1157 return costFromArray(stats); 1158 } 1159 1160 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1161 double cost = 0; 1162 for (BalancerRegionLoad rl : regionLoadList) { 1163 cost += getCostFromRl(rl); 1164 } 1165 return cost / regionLoadList.size(); 1166 } 1167 1168 protected abstract double getCostFromRl(BalancerRegionLoad rl); 1169 } 1170 1171 /** 1172 * Class to be used for the subset of RegionLoad costs that should be treated as rates. 1173 * We do not compare about the actual rate in requests per second but rather the rate relative 1174 * to the rest of the regions. 1175 */ 1176 abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction { 1177 1178 CostFromRegionLoadAsRateFunction(Configuration conf) { 1179 super(conf); 1180 } 1181 1182 @Override 1183 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1184 double cost = 0; 1185 double previous = 0; 1186 boolean isFirst = true; 1187 for (BalancerRegionLoad rl : regionLoadList) { 1188 double current = getCostFromRl(rl); 1189 if (isFirst) { 1190 isFirst = false; 1191 } else { 1192 cost += current - previous; 1193 } 1194 previous = current; 1195 } 1196 return Math.max(0, cost / (regionLoadList.size() - 1)); 1197 } 1198 } 1199 1200 /** 1201 * Compute the cost of total number of read requests The more unbalanced the higher the 1202 * computed cost will be. This uses a rolling average of regionload. 1203 */ 1204 1205 static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1206 1207 private static final String READ_REQUEST_COST_KEY = 1208 "hbase.master.balancer.stochastic.readRequestCost"; 1209 private static final float DEFAULT_READ_REQUEST_COST = 5; 1210 1211 ReadRequestCostFunction(Configuration conf) { 1212 super(conf); 1213 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST)); 1214 } 1215 1216 @Override 1217 protected double getCostFromRl(BalancerRegionLoad rl) { 1218 return rl.getReadRequestsCount(); 1219 } 1220 } 1221 1222 /** 1223 * Compute the cost of total number of write requests. The more unbalanced the higher the 1224 * computed cost will be. This uses a rolling average of regionload. 1225 */ 1226 static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1227 1228 private static final String WRITE_REQUEST_COST_KEY = 1229 "hbase.master.balancer.stochastic.writeRequestCost"; 1230 private static final float DEFAULT_WRITE_REQUEST_COST = 5; 1231 1232 WriteRequestCostFunction(Configuration conf) { 1233 super(conf); 1234 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); 1235 } 1236 1237 @Override 1238 protected double getCostFromRl(BalancerRegionLoad rl) { 1239 return rl.getWriteRequestsCount(); 1240 } 1241 } 1242 1243 /** 1244 * A cost function for region replicas. We give a very high cost to hosting 1245 * replicas of the same region in the same host. We do not prevent the case 1246 * though, since if numReplicas > numRegionServers, we still want to keep the 1247 * replica open. 1248 */ 1249 static class RegionReplicaHostCostFunction extends CostFunction { 1250 private static final String REGION_REPLICA_HOST_COST_KEY = 1251 "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; 1252 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; 1253 1254 long maxCost = 0; 1255 long[] costsPerGroup; // group is either server, host or rack 1256 int[][] primariesOfRegionsPerGroup; 1257 1258 public RegionReplicaHostCostFunction(Configuration conf) { 1259 super(conf); 1260 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, 1261 DEFAULT_REGION_REPLICA_HOST_COST_KEY)); 1262 } 1263 1264 @Override 1265 void init(Cluster cluster) { 1266 super.init(cluster); 1267 // max cost is the case where every region replica is hosted together regardless of host 1268 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; 1269 costsPerGroup = new long[cluster.numHosts]; 1270 primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based 1271 ? cluster.primariesOfRegionsPerHost 1272 : cluster.primariesOfRegionsPerServer; 1273 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) { 1274 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]); 1275 } 1276 } 1277 1278 long getMaxCost(Cluster cluster) { 1279 if (!cluster.hasRegionReplicas) { 1280 return 0; // short circuit 1281 } 1282 // max cost is the case where every region replica is hosted together regardless of host 1283 int[] primariesOfRegions = new int[cluster.numRegions]; 1284 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0, 1285 cluster.regions.length); 1286 1287 Arrays.sort(primariesOfRegions); 1288 1289 // compute numReplicas from the sorted array 1290 return costPerGroup(primariesOfRegions); 1291 } 1292 1293 @Override 1294 boolean isNeeded() { 1295 return cluster.hasRegionReplicas; 1296 } 1297 1298 @Override 1299 protected double cost() { 1300 if (maxCost <= 0) { 1301 return 0; 1302 } 1303 1304 long totalCost = 0; 1305 for (int i = 0 ; i < costsPerGroup.length; i++) { 1306 totalCost += costsPerGroup[i]; 1307 } 1308 return scale(0, maxCost, totalCost); 1309 } 1310 1311 /** 1312 * For each primary region, it computes the total number of replicas in the array (numReplicas) 1313 * and returns a sum of numReplicas-1 squared. For example, if the server hosts 1314 * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it 1315 * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1). 1316 * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted 1317 * @return a sum of numReplicas-1 squared for each primary region in the group. 1318 */ 1319 protected long costPerGroup(int[] primariesOfRegions) { 1320 long cost = 0; 1321 int currentPrimary = -1; 1322 int currentPrimaryIndex = -1; 1323 // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions 1324 // sharing the same primary will have consecutive numbers in the array. 1325 for (int j = 0 ; j <= primariesOfRegions.length; j++) { 1326 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1; 1327 if (primary != currentPrimary) { // we see a new primary 1328 int numReplicas = j - currentPrimaryIndex; 1329 // square the cost 1330 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 1331 cost += (numReplicas - 1) * (numReplicas - 1); 1332 } 1333 currentPrimary = primary; 1334 currentPrimaryIndex = j; 1335 } 1336 } 1337 1338 return cost; 1339 } 1340 1341 @Override 1342 protected void regionMoved(int region, int oldServer, int newServer) { 1343 if (maxCost <= 0) { 1344 return; // no need to compute 1345 } 1346 if (cluster.multiServersPerHost) { 1347 int oldHost = cluster.serverIndexToHostIndex[oldServer]; 1348 int newHost = cluster.serverIndexToHostIndex[newServer]; 1349 if (newHost != oldHost) { 1350 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]); 1351 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]); 1352 } 1353 } else { 1354 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]); 1355 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]); 1356 } 1357 } 1358 } 1359 1360 /** 1361 * A cost function for region replicas for the rack distribution. We give a relatively high 1362 * cost to hosting replicas of the same region in the same rack. We do not prevent the case 1363 * though. 1364 */ 1365 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { 1366 private static final String REGION_REPLICA_RACK_COST_KEY = 1367 "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; 1368 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; 1369 1370 public RegionReplicaRackCostFunction(Configuration conf) { 1371 super(conf); 1372 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, 1373 DEFAULT_REGION_REPLICA_RACK_COST_KEY)); 1374 } 1375 1376 @Override 1377 void init(Cluster cluster) { 1378 this.cluster = cluster; 1379 if (cluster.numRacks <= 1) { 1380 maxCost = 0; 1381 return; // disabled for 1 rack 1382 } 1383 // max cost is the case where every region replica is hosted together regardless of rack 1384 maxCost = getMaxCost(cluster); 1385 costsPerGroup = new long[cluster.numRacks]; 1386 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) { 1387 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]); 1388 } 1389 } 1390 1391 @Override 1392 protected void regionMoved(int region, int oldServer, int newServer) { 1393 if (maxCost <= 0) { 1394 return; // no need to compute 1395 } 1396 int oldRack = cluster.serverIndexToRackIndex[oldServer]; 1397 int newRack = cluster.serverIndexToRackIndex[newServer]; 1398 if (newRack != oldRack) { 1399 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]); 1400 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]); 1401 } 1402 } 1403 } 1404 1405 /** 1406 * Compute the cost of total memstore size. The more unbalanced the higher the 1407 * computed cost will be. This uses a rolling average of regionload. 1408 */ 1409 static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction { 1410 1411 private static final String MEMSTORE_SIZE_COST_KEY = 1412 "hbase.master.balancer.stochastic.memstoreSizeCost"; 1413 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5; 1414 1415 MemStoreSizeCostFunction(Configuration conf) { 1416 super(conf); 1417 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST)); 1418 } 1419 1420 @Override 1421 protected double getCostFromRl(BalancerRegionLoad rl) { 1422 return rl.getMemStoreSizeMB(); 1423 } 1424 } 1425 /** 1426 * Compute the cost of total open storefiles size. The more unbalanced the higher the 1427 * computed cost will be. This uses a rolling average of regionload. 1428 */ 1429 static class StoreFileCostFunction extends CostFromRegionLoadFunction { 1430 1431 private static final String STOREFILE_SIZE_COST_KEY = 1432 "hbase.master.balancer.stochastic.storefileSizeCost"; 1433 private static final float DEFAULT_STOREFILE_SIZE_COST = 5; 1434 1435 StoreFileCostFunction(Configuration conf) { 1436 super(conf); 1437 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST)); 1438 } 1439 1440 @Override 1441 protected double getCostFromRl(BalancerRegionLoad rl) { 1442 return rl.getStorefileSizeMB(); 1443 } 1444 } 1445 1446 /** 1447 * A helper function to compose the attribute name from tablename and costfunction name 1448 */ 1449 public static String composeAttributeName(String tableName, String costFunctionName) { 1450 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 1451 } 1452}