001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayDeque; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Random; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.RegionMetrics; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.master.MasterServices; 041import org.apache.hadoop.hbase.master.RegionPlan; 042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; 043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; 045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType; 046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; 047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.base.Optional; 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057 058/** 059 * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will 060 * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the 061 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p> 062 * <ul> 063 * <li>Region Load</li> 064 * <li>Table Load</li> 065 * <li>Data Locality</li> 066 * <li>Memstore Sizes</li> 067 * <li>Storefile Sizes</li> 068 * </ul> 069 * 070 * 071 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost 072 * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are 073 * scaled by their respective multipliers:</p> 074 * 075 * <ul> 076 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 077 * <li>hbase.master.balancer.stochastic.moveCost</li> 078 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 079 * <li>hbase.master.balancer.stochastic.localityCost</li> 080 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 081 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 082 * </ul> 083 * 084 * <p>In addition to the above configurations, the balancer can be tuned by the following 085 * configuration values:</p> 086 * <ul> 087 * <li>hbase.master.balancer.stochastic.maxMoveRegions which 088 * controls what the max number of regions that can be moved in a single invocation of this 089 * balancer.</li> 090 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 091 * regions is multiplied to try and get the number of times the balancer will 092 * mutate all servers.</li> 093 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that 094 * the balancer will try and mutate all the servers. The balancer will use the minimum of this 095 * value and the above computation.</li> 096 * </ul> 097 * 098 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false 099 * so that the balancer gets the full picture of all loads on the cluster.</p> 100 */ 101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 102@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 103 justification="Complaint is about costFunctions not being synchronized; not end of the world") 104public class StochasticLoadBalancer extends BaseLoadBalancer { 105 106 protected static final String STEPS_PER_REGION_KEY = 107 "hbase.master.balancer.stochastic.stepsPerRegion"; 108 protected static final String MAX_STEPS_KEY = 109 "hbase.master.balancer.stochastic.maxSteps"; 110 protected static final String RUN_MAX_STEPS_KEY = 111 "hbase.master.balancer.stochastic.runMaxSteps"; 112 protected static final String MAX_RUNNING_TIME_KEY = 113 "hbase.master.balancer.stochastic.maxRunningTime"; 114 protected static final String KEEP_REGION_LOADS = 115 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 116 private static final String TABLE_FUNCTION_SEP = "_"; 117 protected static final String MIN_COST_NEED_BALANCE_KEY = 118 "hbase.master.balancer.stochastic.minCostNeedBalance"; 119 120 protected static final Random RANDOM = new Random(System.currentTimeMillis()); 121 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 122 123 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 124 125 // values are defaults 126 private int maxSteps = 1000000; 127 private boolean runMaxSteps = false; 128 private int stepsPerRegion = 800; 129 private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. 130 private int numRegionLoadsToRemember = 15; 131 private float minCostNeedBalance = 0.05f; 132 133 private List<CandidateGenerator> candidateGenerators; 134 private CostFromRegionLoadFunction[] regionLoadFunctions; 135 private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC 136 137 // to save and report costs to JMX 138 private Double curOverallCost = 0d; 139 private Double[] tempFunctionCosts; 140 private Double[] curFunctionCosts; 141 142 // Keep locality based picker and cost function to alert them 143 // when new services are offered 144 private LocalityBasedCandidateGenerator localityCandidateGenerator; 145 private ServerLocalityCostFunction localityCost; 146 private RackLocalityCostFunction rackLocalityCost; 147 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 148 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 149 private boolean isByTable = false; 150 private TableName tableName = null; 151 152 /** 153 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 154 * default MetricsBalancer 155 */ 156 public StochasticLoadBalancer() { 157 super(new MetricsStochasticBalancer()); 158 } 159 160 @Override 161 public void onConfigurationChange(Configuration conf) { 162 setConf(conf); 163 } 164 165 @Override 166 public synchronized void setConf(Configuration conf) { 167 super.setConf(conf); 168 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); 169 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); 170 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); 171 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps); 172 173 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); 174 isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); 175 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance); 176 if (localityCandidateGenerator == null) { 177 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); 178 } 179 localityCost = new ServerLocalityCostFunction(conf, services); 180 rackLocalityCost = new RackLocalityCostFunction(conf, services); 181 182 if (this.candidateGenerators == null) { 183 candidateGenerators = Lists.newArrayList(); 184 candidateGenerators.add(new RandomCandidateGenerator()); 185 candidateGenerators.add(new LoadCandidateGenerator()); 186 candidateGenerators.add(localityCandidateGenerator); 187 candidateGenerators.add(new RegionReplicaRackCandidateGenerator()); 188 } 189 regionLoadFunctions = new CostFromRegionLoadFunction[] { 190 new ReadRequestCostFunction(conf), 191 new WriteRequestCostFunction(conf), 192 new MemStoreSizeCostFunction(conf), 193 new StoreFileCostFunction(conf) 194 }; 195 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 196 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 197 costFunctions = new CostFunction[]{ 198 new RegionCountSkewCostFunction(conf), 199 new PrimaryRegionCountSkewCostFunction(conf), 200 new MoveCostFunction(conf), 201 localityCost, 202 rackLocalityCost, 203 new TableSkewCostFunction(conf), 204 regionReplicaHostCostFunction, 205 regionReplicaRackCostFunction, 206 regionLoadFunctions[0], 207 regionLoadFunctions[1], 208 regionLoadFunctions[2], 209 regionLoadFunctions[3], 210 }; 211 curFunctionCosts= new Double[costFunctions.length]; 212 tempFunctionCosts= new Double[costFunctions.length]; 213 LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion + 214 ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc."); 215 } 216 217 protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) { 218 this.candidateGenerators = customCandidateGenerators; 219 } 220 221 @Override 222 protected void setSlop(Configuration conf) { 223 this.slop = conf.getFloat("hbase.regions.slop", 0.001F); 224 } 225 226 @Override 227 public synchronized void setClusterMetrics(ClusterMetrics st) { 228 super.setClusterMetrics(st); 229 updateRegionLoad(); 230 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 231 cost.setClusterMetrics(st); 232 } 233 234 // update metrics size 235 try { 236 // by-table or ensemble mode 237 int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1; 238 int functionsCount = getCostFunctionNames().length; 239 240 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 241 } catch (Exception e) { 242 LOG.error("failed to get the size of all tables", e); 243 } 244 } 245 246 /** 247 * Update the number of metrics that are reported to JMX 248 */ 249 public void updateMetricsSize(int size) { 250 if (metricsBalancer instanceof MetricsStochasticBalancer) { 251 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 252 } 253 } 254 255 @Override 256 public synchronized void setMasterServices(MasterServices masterServices) { 257 super.setMasterServices(masterServices); 258 this.localityCost.setServices(masterServices); 259 this.rackLocalityCost.setServices(masterServices); 260 this.localityCandidateGenerator.setServices(masterServices); 261 } 262 263 @Override 264 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { 265 regionReplicaHostCostFunction.init(c); 266 if (regionReplicaHostCostFunction.cost() > 0) return true; 267 regionReplicaRackCostFunction.init(c); 268 if (regionReplicaRackCostFunction.cost() > 0) return true; 269 return false; 270 } 271 272 @Override 273 protected boolean needsBalance(Cluster cluster) { 274 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 275 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 276 if (LOG.isDebugEnabled()) { 277 LOG.debug("Not running balancer because only " + cs.getNumServers() 278 + " active regionserver(s)"); 279 } 280 return false; 281 } 282 if (areSomeRegionReplicasColocated(cluster)) { 283 return true; 284 } 285 286 double total = 0.0; 287 float sumMultiplier = 0.0f; 288 for (CostFunction c : costFunctions) { 289 float multiplier = c.getMultiplier(); 290 if (multiplier <= 0) { 291 continue; 292 } 293 if (!c.isNeeded()) { 294 LOG.debug("{} not needed", c.getClass().getSimpleName()); 295 continue; 296 } 297 sumMultiplier += multiplier; 298 total += c.cost() * multiplier; 299 } 300 301 if (total <= 0 || sumMultiplier <= 0 302 || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) { 303 if (LOG.isTraceEnabled()) { 304 final String loadBalanceTarget = 305 isByTable ? String.format("table (%s)", tableName) : "cluster"; 306 LOG.trace("Skipping load balancing because the {} is balanced. Total cost: {}, " 307 + "Sum multiplier: {}, Minimum cost needed for balance: {}", loadBalanceTarget, total, 308 sumMultiplier, minCostNeedBalance); 309 } 310 return false; 311 } 312 return true; 313 } 314 315 @Override 316 public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, 317 List<RegionInfo>> clusterState) { 318 this.tableName = tableName; 319 return balanceCluster(clusterState); 320 } 321 322 @VisibleForTesting 323 Cluster.Action nextAction(Cluster cluster) { 324 return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size())) 325 .generate(cluster); 326 } 327 328 /** 329 * Given the cluster state this will try and approach an optimal balance. This 330 * should always approach the optimal state given enough steps. 331 */ 332 @Override 333 public synchronized List<RegionPlan> balanceCluster(Map<ServerName, 334 List<RegionInfo>> clusterState) { 335 List<RegionPlan> plans = balanceMasterRegions(clusterState); 336 if (plans != null || clusterState == null || clusterState.size() <= 1) { 337 return plans; 338 } 339 340 if (masterServerName != null && clusterState.containsKey(masterServerName)) { 341 if (clusterState.size() <= 2) { 342 return null; 343 } 344 clusterState = new HashMap<>(clusterState); 345 clusterState.remove(masterServerName); 346 } 347 348 // On clusters with lots of HFileLinks or lots of reference files, 349 // instantiating the storefile infos can be quite expensive. 350 // Allow turning this feature off if the locality cost is not going to 351 // be used in any computations. 352 RegionLocationFinder finder = null; 353 if ((this.localityCost != null && this.localityCost.getMultiplier() > 0) 354 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) { 355 finder = this.regionFinder; 356 } 357 358 //The clusterState that is given to this method contains the state 359 //of all the regions in the table(s) (that's true today) 360 // Keep track of servers to iterate through them. 361 Cluster cluster = new Cluster(clusterState, loads, finder, rackManager); 362 363 long startTime = EnvironmentEdgeManager.currentTime(); 364 365 initCosts(cluster); 366 367 if (!needsBalance(cluster)) { 368 return null; 369 } 370 371 double currentCost = computeCost(cluster, Double.MAX_VALUE); 372 curOverallCost = currentCost; 373 for (int i = 0; i < this.curFunctionCosts.length; i++) { 374 curFunctionCosts[i] = tempFunctionCosts[i]; 375 } 376 double initCost = currentCost; 377 double newCost = currentCost; 378 379 long computedMaxSteps; 380 if (runMaxSteps) { 381 computedMaxSteps = Math.max(this.maxSteps, 382 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); 383 } else { 384 long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion * 385 (long)cluster.numServers; 386 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 387 if (calculatedMaxSteps > maxSteps) { 388 LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 389 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 390 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 391 + "(This config change does not require service restart)", calculatedMaxSteps, 392 maxRunningTime); 393 } 394 } 395 LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost=" 396 + functionCost() + " computedMaxSteps: " + computedMaxSteps); 397 398 // Perform a stochastic walk to see if we can get a good fit. 399 long step; 400 401 for (step = 0; step < computedMaxSteps; step++) { 402 Cluster.Action action = nextAction(cluster); 403 404 if (action.type == Type.NULL) { 405 continue; 406 } 407 408 cluster.doAction(action); 409 updateCostsWithAction(cluster, action); 410 411 newCost = computeCost(cluster, currentCost); 412 413 // Should this be kept? 414 if (newCost < currentCost) { 415 currentCost = newCost; 416 417 // save for JMX 418 curOverallCost = currentCost; 419 for (int i = 0; i < this.curFunctionCosts.length; i++) { 420 curFunctionCosts[i] = tempFunctionCosts[i]; 421 } 422 } else { 423 // Put things back the way they were before. 424 // TODO: undo by remembering old values 425 Action undoAction = action.undoAction(); 426 cluster.doAction(undoAction); 427 updateCostsWithAction(cluster, undoAction); 428 } 429 430 if (EnvironmentEdgeManager.currentTime() - startTime > 431 maxRunningTime) { 432 break; 433 } 434 } 435 long endTime = EnvironmentEdgeManager.currentTime(); 436 437 metricsBalancer.balanceCluster(endTime - startTime); 438 439 // update costs metrics 440 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 441 if (initCost > currentCost) { 442 plans = createRegionPlans(cluster); 443 LOG.info("Finished computing new load balance plan. Computation took {}" + 444 " to try {} different iterations. Found a solution that moves " + 445 "{} regions; Going from a computed cost of {}" + 446 " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime), 447 step, plans.size(), initCost, currentCost); 448 return plans; 449 } 450 LOG.info("Could not find a better load balance plan. Tried {} different configurations in " + 451 "{}, and did not find anything with a computed cost less than {}", step, 452 java.time.Duration.ofMillis(endTime - startTime), initCost); 453 return null; 454 } 455 456 /** 457 * update costs to JMX 458 */ 459 private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) { 460 if (tableName == null) return; 461 462 // check if the metricsBalancer is MetricsStochasticBalancer before casting 463 if (metricsBalancer instanceof MetricsStochasticBalancer) { 464 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 465 // overall cost 466 balancer.updateStochasticCost(tableName.getNameAsString(), 467 "Overall", "Overall cost", overall); 468 469 // each cost function 470 for (int i = 0; i < costFunctions.length; i++) { 471 CostFunction costFunction = costFunctions[i]; 472 String costFunctionName = costFunction.getClass().getSimpleName(); 473 Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 474 // TODO: cost function may need a specific description 475 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 476 "The percent of " + costFunctionName, costPercent); 477 } 478 } 479 } 480 481 private String functionCost() { 482 StringBuilder builder = new StringBuilder(); 483 for (CostFunction c:costFunctions) { 484 builder.append(c.getClass().getSimpleName()); 485 builder.append(" : ("); 486 builder.append(c.getMultiplier()); 487 builder.append(", "); 488 builder.append(c.cost()); 489 builder.append("); "); 490 } 491 return builder.toString(); 492 } 493 494 /** 495 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 496 * state. 497 * 498 * @param cluster The state of the cluster 499 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 500 */ 501 private List<RegionPlan> createRegionPlans(Cluster cluster) { 502 List<RegionPlan> plans = new LinkedList<>(); 503 for (int regionIndex = 0; 504 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { 505 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 506 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 507 508 if (initialServerIndex != newServerIndex) { 509 RegionInfo region = cluster.regions[regionIndex]; 510 ServerName initialServer = cluster.servers[initialServerIndex]; 511 ServerName newServer = cluster.servers[newServerIndex]; 512 513 if (LOG.isTraceEnabled()) { 514 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 515 + initialServer.getHostname() + " to " + newServer.getHostname()); 516 } 517 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 518 plans.add(rp); 519 } 520 } 521 return plans; 522 } 523 524 /** 525 * Store the current region loads. 526 */ 527 private synchronized void updateRegionLoad() { 528 // We create a new hashmap so that regions that are no longer there are removed. 529 // However we temporarily need the old loads so we can use them to keep the rolling average. 530 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 531 loads = new HashMap<>(); 532 533 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 534 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 535 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 536 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 537 if (rLoads == null) { 538 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 539 } else if (rLoads.size() >= numRegionLoadsToRemember) { 540 rLoads.remove(); 541 } 542 rLoads.add(new BalancerRegionLoad(rm)); 543 loads.put(regionNameAsString, rLoads); 544 }); 545 }); 546 547 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 548 cost.setLoads(loads); 549 } 550 } 551 552 protected void initCosts(Cluster cluster) { 553 for (CostFunction c:costFunctions) { 554 c.init(cluster); 555 } 556 } 557 558 protected void updateCostsWithAction(Cluster cluster, Action action) { 559 for (CostFunction c : costFunctions) { 560 c.postAction(action); 561 } 562 } 563 564 /** 565 * Get the names of the cost functions 566 */ 567 public String[] getCostFunctionNames() { 568 if (costFunctions == null) return null; 569 String[] ret = new String[costFunctions.length]; 570 for (int i = 0; i < costFunctions.length; i++) { 571 CostFunction c = costFunctions[i]; 572 ret[i] = c.getClass().getSimpleName(); 573 } 574 575 return ret; 576 } 577 578 /** 579 * This is the main cost function. It will compute a cost associated with a proposed cluster 580 * state. All different costs will be combined with their multipliers to produce a double cost. 581 * 582 * @param cluster The state of the cluster 583 * @param previousCost the previous cost. This is used as an early out. 584 * @return a double of a cost associated with the proposed cluster state. This cost is an 585 * aggregate of all individual cost functions. 586 */ 587 protected double computeCost(Cluster cluster, double previousCost) { 588 double total = 0; 589 590 for (int i = 0; i < costFunctions.length; i++) { 591 CostFunction c = costFunctions[i]; 592 this.tempFunctionCosts[i] = 0.0; 593 594 if (c.getMultiplier() <= 0) { 595 continue; 596 } 597 598 Float multiplier = c.getMultiplier(); 599 Double cost = c.cost(); 600 601 this.tempFunctionCosts[i] = multiplier*cost; 602 total += this.tempFunctionCosts[i]; 603 604 if (total > previousCost) { 605 break; 606 } 607 } 608 609 return total; 610 } 611 612 /** Generates a candidate action to be applied to the cluster for cost function search */ 613 abstract static class CandidateGenerator { 614 abstract Cluster.Action generate(Cluster cluster); 615 616 /** 617 * From a list of regions pick a random one. Null can be returned which 618 * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move 619 * rather than swap. 620 * 621 * @param cluster The state of the cluster 622 * @param server index of the server 623 * @param chanceOfNoSwap Chance that this will decide to try a move rather 624 * than a swap. 625 * @return a random {@link RegionInfo} or null if an asymmetrical move is 626 * suggested. 627 */ 628 protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { 629 // Check to see if this is just a move. 630 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { 631 // signal a move only. 632 return -1; 633 } 634 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); 635 return cluster.regionsPerServer[server][rand]; 636 637 } 638 protected int pickRandomServer(Cluster cluster) { 639 if (cluster.numServers < 1) { 640 return -1; 641 } 642 643 return RANDOM.nextInt(cluster.numServers); 644 } 645 646 protected int pickRandomRack(Cluster cluster) { 647 if (cluster.numRacks < 1) { 648 return -1; 649 } 650 651 return RANDOM.nextInt(cluster.numRacks); 652 } 653 654 protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { 655 if (cluster.numServers < 2) { 656 return -1; 657 } 658 while (true) { 659 int otherServerIndex = pickRandomServer(cluster); 660 if (otherServerIndex != serverIndex) { 661 return otherServerIndex; 662 } 663 } 664 } 665 666 protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { 667 if (cluster.numRacks < 2) { 668 return -1; 669 } 670 while (true) { 671 int otherRackIndex = pickRandomRack(cluster); 672 if (otherRackIndex != rackIndex) { 673 return otherRackIndex; 674 } 675 } 676 } 677 678 protected Cluster.Action pickRandomRegions(Cluster cluster, 679 int thisServer, 680 int otherServer) { 681 if (thisServer < 0 || otherServer < 0) { 682 return Cluster.NullAction; 683 } 684 685 // Decide who is most likely to need another region 686 int thisRegionCount = cluster.getNumRegions(thisServer); 687 int otherRegionCount = cluster.getNumRegions(otherServer); 688 689 // Assign the chance based upon the above 690 double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5; 691 double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5; 692 693 int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); 694 int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); 695 696 return getAction(thisServer, thisRegion, otherServer, otherRegion); 697 } 698 699 protected Cluster.Action getAction(int fromServer, int fromRegion, 700 int toServer, int toRegion) { 701 if (fromServer < 0 || toServer < 0) { 702 return Cluster.NullAction; 703 } 704 if (fromRegion > 0 && toRegion > 0) { 705 return new Cluster.SwapRegionsAction(fromServer, fromRegion, 706 toServer, toRegion); 707 } else if (fromRegion > 0) { 708 return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer); 709 } else if (toRegion > 0) { 710 return new Cluster.MoveRegionAction(toRegion, toServer, fromServer); 711 } else { 712 return Cluster.NullAction; 713 } 714 } 715 716 /** 717 * Returns a random iteration order of indexes of an array with size length 718 */ 719 protected List<Integer> getRandomIterationOrder(int length) { 720 ArrayList<Integer> order = new ArrayList<>(length); 721 for (int i = 0; i < length; i++) { 722 order.add(i); 723 } 724 Collections.shuffle(order); 725 return order; 726 } 727 } 728 729 static class RandomCandidateGenerator extends CandidateGenerator { 730 731 @Override 732 Cluster.Action generate(Cluster cluster) { 733 734 int thisServer = pickRandomServer(cluster); 735 736 // Pick the other server 737 int otherServer = pickOtherRandomServer(cluster, thisServer); 738 739 return pickRandomRegions(cluster, thisServer, otherServer); 740 } 741 } 742 743 static class LoadCandidateGenerator extends CandidateGenerator { 744 745 @Override 746 Cluster.Action generate(Cluster cluster) { 747 cluster.sortServersByRegionCount(); 748 int thisServer = pickMostLoadedServer(cluster, -1); 749 int otherServer = pickLeastLoadedServer(cluster, thisServer); 750 751 return pickRandomRegions(cluster, thisServer, otherServer); 752 } 753 754 private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { 755 Integer[] servers = cluster.serverIndicesSortedByRegionCount; 756 757 int index = 0; 758 while (servers[index] == null || servers[index] == thisServer) { 759 index++; 760 if (index == servers.length) { 761 return -1; 762 } 763 } 764 return servers[index]; 765 } 766 767 private int pickMostLoadedServer(final Cluster cluster, int thisServer) { 768 Integer[] servers = cluster.serverIndicesSortedByRegionCount; 769 770 int index = servers.length - 1; 771 while (servers[index] == null || servers[index] == thisServer) { 772 index--; 773 if (index < 0) { 774 return -1; 775 } 776 } 777 return servers[index]; 778 } 779 } 780 781 static class LocalityBasedCandidateGenerator extends CandidateGenerator { 782 783 private MasterServices masterServices; 784 785 LocalityBasedCandidateGenerator(MasterServices masterServices) { 786 this.masterServices = masterServices; 787 } 788 789 @Override 790 Cluster.Action generate(Cluster cluster) { 791 if (this.masterServices == null) { 792 int thisServer = pickRandomServer(cluster); 793 // Pick the other server 794 int otherServer = pickOtherRandomServer(cluster, thisServer); 795 return pickRandomRegions(cluster, thisServer, otherServer); 796 } 797 798 // Randomly iterate through regions until you find one that is not on ideal host 799 for (int region : getRandomIterationOrder(cluster.numRegions)) { 800 int currentServer = cluster.regionIndexToServerIndex[region]; 801 if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) { 802 Optional<Action> potential = tryMoveOrSwap( 803 cluster, 804 currentServer, 805 region, 806 cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region] 807 ); 808 if (potential.isPresent()) { 809 return potential.get(); 810 } 811 } 812 } 813 return Cluster.NullAction; 814 } 815 816 /** 817 * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved. 818 * Returns empty optional if no move can be found 819 */ 820 private Optional<Action> tryMoveOrSwap(Cluster cluster, 821 int fromServer, 822 int fromRegion, 823 int toServer) { 824 // Try move first. We know apriori fromRegion has the highest locality on toServer 825 if (cluster.serverHasTooFewRegions(toServer)) { 826 return Optional.of(getAction(fromServer, fromRegion, toServer, -1)); 827 } 828 829 // Compare locality gain/loss from swapping fromRegion with regions on toServer 830 double fromRegionLocalityDelta = 831 getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer); 832 for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) { 833 int toRegion = cluster.regionsPerServer[toServer][toRegionIndex]; 834 double toRegionLocalityDelta = 835 getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer); 836 // If locality would remain neutral or improve, attempt the swap 837 if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) { 838 return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion)); 839 } 840 } 841 842 return Optional.absent(); 843 } 844 845 private double getWeightedLocality(Cluster cluster, int region, int server) { 846 return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER); 847 } 848 849 void setServices(MasterServices services) { 850 this.masterServices = services; 851 } 852 } 853 854 /** 855 * Generates candidates which moves the replicas out of the region server for 856 * co-hosted region replicas 857 */ 858 static class RegionReplicaCandidateGenerator extends CandidateGenerator { 859 860 RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); 861 862 /** 863 * Randomly select one regionIndex out of all region replicas co-hosted in the same group 864 * (a group is a server, host or rack) 865 * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, 866 * primariesOfRegionsPerHost or primariesOfRegionsPerRack 867 * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack 868 * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex 869 * @return a regionIndex for the selected primary or -1 if there is no co-locating 870 */ 871 int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup 872 , int[] regionIndexToPrimaryIndex) { 873 int currentPrimary = -1; 874 int currentPrimaryIndex = -1; 875 int selectedPrimaryIndex = -1; 876 double currentLargestRandom = -1; 877 // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region 878 // ids for the regions hosted in server, a consecutive repetition means that replicas 879 // are co-hosted 880 for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { 881 int primary = j < primariesOfRegionsPerGroup.length 882 ? primariesOfRegionsPerGroup[j] : -1; 883 if (primary != currentPrimary) { // check for whether we see a new primary 884 int numReplicas = j - currentPrimaryIndex; 885 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 886 // decide to select this primary region id or not 887 double currentRandom = RANDOM.nextDouble(); 888 // we don't know how many region replicas are co-hosted, we will randomly select one 889 // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) 890 if (currentRandom > currentLargestRandom) { 891 selectedPrimaryIndex = currentPrimary; 892 currentLargestRandom = currentRandom; 893 } 894 } 895 currentPrimary = primary; 896 currentPrimaryIndex = j; 897 } 898 } 899 900 // we have found the primary id for the region to move. Now find the actual regionIndex 901 // with the given primary, prefer to move the secondary region. 902 for (int j = 0; j < regionsPerGroup.length; j++) { 903 int regionIndex = regionsPerGroup[j]; 904 if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { 905 // always move the secondary, not the primary 906 if (selectedPrimaryIndex != regionIndex) { 907 return regionIndex; 908 } 909 } 910 } 911 return -1; 912 } 913 914 @Override 915 Cluster.Action generate(Cluster cluster) { 916 int serverIndex = pickRandomServer(cluster); 917 if (cluster.numServers <= 1 || serverIndex == -1) { 918 return Cluster.NullAction; 919 } 920 921 int regionIndex = selectCoHostedRegionPerGroup( 922 cluster.primariesOfRegionsPerServer[serverIndex], 923 cluster.regionsPerServer[serverIndex], 924 cluster.regionIndexToPrimaryIndex); 925 926 // if there are no pairs of region replicas co-hosted, default to random generator 927 if (regionIndex == -1) { 928 // default to randompicker 929 return randomGenerator.generate(cluster); 930 } 931 932 int toServerIndex = pickOtherRandomServer(cluster, serverIndex); 933 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 934 return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); 935 } 936 } 937 938 /** 939 * Generates candidates which moves the replicas out of the rack for 940 * co-hosted region replicas in the same rack 941 */ 942 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { 943 @Override 944 Cluster.Action generate(Cluster cluster) { 945 int rackIndex = pickRandomRack(cluster); 946 if (cluster.numRacks <= 1 || rackIndex == -1) { 947 return super.generate(cluster); 948 } 949 950 int regionIndex = selectCoHostedRegionPerGroup( 951 cluster.primariesOfRegionsPerRack[rackIndex], 952 cluster.regionsPerRack[rackIndex], 953 cluster.regionIndexToPrimaryIndex); 954 955 // if there are no pairs of region replicas co-hosted, default to random generator 956 if (regionIndex == -1) { 957 // default to randompicker 958 return randomGenerator.generate(cluster); 959 } 960 961 int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; 962 int toRackIndex = pickOtherRandomRack(cluster, rackIndex); 963 964 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); 965 int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; 966 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 967 return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); 968 } 969 } 970 971 /** 972 * Base class of StochasticLoadBalancer's Cost Functions. 973 */ 974 abstract static class CostFunction { 975 976 private float multiplier = 0; 977 978 protected Cluster cluster; 979 980 CostFunction(Configuration c) { 981 } 982 983 boolean isNeeded() { 984 return true; 985 } 986 float getMultiplier() { 987 return multiplier; 988 } 989 990 void setMultiplier(float m) { 991 this.multiplier = m; 992 } 993 994 /** Called once per LB invocation to give the cost function 995 * to initialize it's state, and perform any costly calculation. 996 */ 997 void init(Cluster cluster) { 998 this.cluster = cluster; 999 } 1000 1001 /** Called once per cluster Action to give the cost function 1002 * an opportunity to update it's state. postAction() is always 1003 * called at least once before cost() is called with the cluster 1004 * that this action is performed on. */ 1005 void postAction(Action action) { 1006 switch (action.type) { 1007 case NULL: break; 1008 case ASSIGN_REGION: 1009 AssignRegionAction ar = (AssignRegionAction) action; 1010 regionMoved(ar.region, -1, ar.server); 1011 break; 1012 case MOVE_REGION: 1013 MoveRegionAction mra = (MoveRegionAction) action; 1014 regionMoved(mra.region, mra.fromServer, mra.toServer); 1015 break; 1016 case SWAP_REGIONS: 1017 SwapRegionsAction a = (SwapRegionsAction) action; 1018 regionMoved(a.fromRegion, a.fromServer, a.toServer); 1019 regionMoved(a.toRegion, a.toServer, a.fromServer); 1020 break; 1021 default: 1022 throw new RuntimeException("Uknown action:" + action.type); 1023 } 1024 } 1025 1026 protected void regionMoved(int region, int oldServer, int newServer) { 1027 } 1028 1029 abstract double cost(); 1030 1031 /** 1032 * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}. 1033 * It assumes that this is a zero sum set of costs. It assumes that the worst case 1034 * possible is all of the elements in one region server and the rest having 0. 1035 * 1036 * @param stats the costs 1037 * @return a scaled set of costs. 1038 */ 1039 protected double costFromArray(double[] stats) { 1040 double totalCost = 0; 1041 double total = getSum(stats); 1042 1043 double count = stats.length; 1044 double mean = total/count; 1045 1046 // Compute max as if all region servers had 0 and one had the sum of all costs. This must be 1047 // a zero sum cost for this to make sense. 1048 double max = ((count - 1) * mean) + (total - mean); 1049 1050 // It's possible that there aren't enough regions to go around 1051 double min; 1052 if (count > total) { 1053 min = ((count - total) * mean) + ((1 - mean) * total); 1054 } else { 1055 // Some will have 1 more than everything else. 1056 int numHigh = (int) (total - (Math.floor(mean) * count)); 1057 int numLow = (int) (count - numHigh); 1058 1059 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean))); 1060 1061 } 1062 min = Math.max(0, min); 1063 for (int i=0; i<stats.length; i++) { 1064 double n = stats[i]; 1065 double diff = Math.abs(mean - n); 1066 totalCost += diff; 1067 } 1068 1069 double scaled = scale(min, max, totalCost); 1070 return scaled; 1071 } 1072 1073 private double getSum(double[] stats) { 1074 double total = 0; 1075 for(double s:stats) { 1076 total += s; 1077 } 1078 return total; 1079 } 1080 1081 /** 1082 * Scale the value between 0 and 1. 1083 * 1084 * @param min Min value 1085 * @param max The Max value 1086 * @param value The value to be scaled. 1087 * @return The scaled value. 1088 */ 1089 protected double scale(double min, double max, double value) { 1090 if (max <= min || value <= min) { 1091 return 0; 1092 } 1093 if ((max - min) == 0) return 0; 1094 1095 return Math.max(0d, Math.min(1d, (value - min) / (max - min))); 1096 } 1097 } 1098 1099 /** 1100 * Given the starting state of the regions and a potential ending state 1101 * compute cost based upon the number of regions that have moved. 1102 */ 1103 static class MoveCostFunction extends CostFunction { 1104 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; 1105 private static final String MAX_MOVES_PERCENT_KEY = 1106 "hbase.master.balancer.stochastic.maxMovePercent"; 1107 private static final float DEFAULT_MOVE_COST = 7; 1108 private static final int DEFAULT_MAX_MOVES = 600; 1109 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f; 1110 1111 private final float maxMovesPercent; 1112 1113 MoveCostFunction(Configuration conf) { 1114 super(conf); 1115 1116 // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure 1117 // that large benefits are need to overcome the cost of a move. 1118 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST)); 1119 // What percent of the number of regions a single run of the balancer can move. 1120 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT); 1121 } 1122 1123 @Override 1124 double cost() { 1125 // Try and size the max number of Moves, but always be prepared to move some. 1126 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), 1127 DEFAULT_MAX_MOVES); 1128 1129 double moveCost = cluster.numMovedRegions; 1130 1131 // Don't let this single balance move more than the max moves. 1132 // This allows better scaling to accurately represent the actual cost of a move. 1133 if (moveCost > maxMoves) { 1134 return 1000000; // return a number much greater than any of the other cost 1135 } 1136 1137 return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost); 1138 } 1139 } 1140 1141 /** 1142 * Compute the cost of a potential cluster state from skew in number of 1143 * regions on a cluster. 1144 */ 1145 static class RegionCountSkewCostFunction extends CostFunction { 1146 private static final String REGION_COUNT_SKEW_COST_KEY = 1147 "hbase.master.balancer.stochastic.regionCountCost"; 1148 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; 1149 1150 private double[] stats = null; 1151 1152 RegionCountSkewCostFunction(Configuration conf) { 1153 super(conf); 1154 // Load multiplier should be the greatest as it is the most general way to balance data. 1155 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 1156 } 1157 1158 @Override 1159 double cost() { 1160 if (stats == null || stats.length != cluster.numServers) { 1161 stats = new double[cluster.numServers]; 1162 } 1163 1164 for (int i =0; i < cluster.numServers; i++) { 1165 stats[i] = cluster.regionsPerServer[i].length; 1166 } 1167 1168 return costFromArray(stats); 1169 } 1170 } 1171 1172 /** 1173 * Compute the cost of a potential cluster state from skew in number of 1174 * primary regions on a cluster. 1175 */ 1176 static class PrimaryRegionCountSkewCostFunction extends CostFunction { 1177 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY = 1178 "hbase.master.balancer.stochastic.primaryRegionCountCost"; 1179 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500; 1180 1181 private double[] stats = null; 1182 1183 PrimaryRegionCountSkewCostFunction(Configuration conf) { 1184 super(conf); 1185 // Load multiplier should be the greatest as primary regions serve majority of reads/writes. 1186 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, 1187 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST)); 1188 } 1189 1190 @Override 1191 boolean isNeeded() { 1192 return cluster.hasRegionReplicas; 1193 } 1194 1195 @Override 1196 double cost() { 1197 if (!cluster.hasRegionReplicas) { 1198 return 0; 1199 } 1200 if (stats == null || stats.length != cluster.numServers) { 1201 stats = new double[cluster.numServers]; 1202 } 1203 1204 for (int i = 0; i < cluster.numServers; i++) { 1205 stats[i] = 0; 1206 for (int regionIdx : cluster.regionsPerServer[i]) { 1207 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { 1208 stats[i]++; 1209 } 1210 } 1211 } 1212 1213 return costFromArray(stats); 1214 } 1215 } 1216 1217 /** 1218 * Compute the cost of a potential cluster configuration based upon how evenly 1219 * distributed tables are. 1220 */ 1221 static class TableSkewCostFunction extends CostFunction { 1222 1223 private static final String TABLE_SKEW_COST_KEY = 1224 "hbase.master.balancer.stochastic.tableSkewCost"; 1225 private static final float DEFAULT_TABLE_SKEW_COST = 35; 1226 1227 TableSkewCostFunction(Configuration conf) { 1228 super(conf); 1229 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); 1230 } 1231 1232 @Override 1233 double cost() { 1234 double max = cluster.numRegions; 1235 double min = ((double) cluster.numRegions) / cluster.numServers; 1236 double value = 0; 1237 1238 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { 1239 value += cluster.numMaxRegionsPerTable[i]; 1240 } 1241 1242 return scale(min, max, value); 1243 } 1244 } 1245 1246 /** 1247 * Compute a cost of a potential cluster configuration based upon where 1248 * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located. 1249 */ 1250 static abstract class LocalityBasedCostFunction extends CostFunction { 1251 1252 private final LocalityType type; 1253 1254 private double bestLocality; // best case locality across cluster weighted by local data size 1255 private double locality; // current locality across cluster weighted by local data size 1256 1257 private MasterServices services; 1258 1259 LocalityBasedCostFunction(Configuration conf, 1260 MasterServices srv, 1261 LocalityType type, 1262 String localityCostKey, 1263 float defaultLocalityCost) { 1264 super(conf); 1265 this.type = type; 1266 this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost)); 1267 this.services = srv; 1268 this.locality = 0.0; 1269 this.bestLocality = 0.0; 1270 } 1271 1272 /** 1273 * Maps region to the current entity (server or rack) on which it is stored 1274 */ 1275 abstract int regionIndexToEntityIndex(int region); 1276 1277 public void setServices(MasterServices srvc) { 1278 this.services = srvc; 1279 } 1280 1281 @Override 1282 void init(Cluster cluster) { 1283 super.init(cluster); 1284 locality = 0.0; 1285 bestLocality = 0.0; 1286 1287 // If no master, no computation will work, so assume 0 cost 1288 if (this.services == null) { 1289 return; 1290 } 1291 1292 for (int region = 0; region < cluster.numRegions; region++) { 1293 locality += getWeightedLocality(region, regionIndexToEntityIndex(region)); 1294 bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region)); 1295 } 1296 1297 // We normalize locality to be a score between 0 and 1.0 representing how good it 1298 // is compared to how good it could be. If bestLocality is 0, assume locality is 100 1299 // (and the cost is 0) 1300 locality = bestLocality == 0 ? 1.0 : locality / bestLocality; 1301 } 1302 1303 @Override 1304 protected void regionMoved(int region, int oldServer, int newServer) { 1305 int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer]; 1306 int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer]; 1307 if (this.services == null) { 1308 return; 1309 } 1310 double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity); 1311 double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality; 1312 locality += normalizedDelta; 1313 } 1314 1315 @Override 1316 double cost() { 1317 return 1 - locality; 1318 } 1319 1320 private int getMostLocalEntityForRegion(int region) { 1321 return cluster.getOrComputeRegionsToMostLocalEntities(type)[region]; 1322 } 1323 1324 private double getWeightedLocality(int region, int entity) { 1325 return cluster.getOrComputeWeightedLocality(region, entity, type); 1326 } 1327 1328 } 1329 1330 static class ServerLocalityCostFunction extends LocalityBasedCostFunction { 1331 1332 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; 1333 private static final float DEFAULT_LOCALITY_COST = 25; 1334 1335 ServerLocalityCostFunction(Configuration conf, MasterServices srv) { 1336 super( 1337 conf, 1338 srv, 1339 LocalityType.SERVER, 1340 LOCALITY_COST_KEY, 1341 DEFAULT_LOCALITY_COST 1342 ); 1343 } 1344 1345 @Override 1346 int regionIndexToEntityIndex(int region) { 1347 return cluster.regionIndexToServerIndex[region]; 1348 } 1349 } 1350 1351 static class RackLocalityCostFunction extends LocalityBasedCostFunction { 1352 1353 private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost"; 1354 private static final float DEFAULT_RACK_LOCALITY_COST = 15; 1355 1356 public RackLocalityCostFunction(Configuration conf, MasterServices services) { 1357 super( 1358 conf, 1359 services, 1360 LocalityType.RACK, 1361 RACK_LOCALITY_COST_KEY, 1362 DEFAULT_RACK_LOCALITY_COST 1363 ); 1364 } 1365 1366 @Override 1367 int regionIndexToEntityIndex(int region) { 1368 return cluster.getRackForRegion(region); 1369 } 1370 } 1371 1372 /** 1373 * Base class the allows writing costs functions from rolling average of some 1374 * number from RegionLoad. 1375 */ 1376 abstract static class CostFromRegionLoadFunction extends CostFunction { 1377 1378 private ClusterMetrics clusterStatus = null; 1379 private Map<String, Deque<BalancerRegionLoad>> loads = null; 1380 private double[] stats = null; 1381 CostFromRegionLoadFunction(Configuration conf) { 1382 super(conf); 1383 } 1384 1385 void setClusterMetrics(ClusterMetrics status) { 1386 this.clusterStatus = status; 1387 } 1388 1389 void setLoads(Map<String, Deque<BalancerRegionLoad>> l) { 1390 this.loads = l; 1391 } 1392 1393 @Override 1394 double cost() { 1395 if (clusterStatus == null || loads == null) { 1396 return 0; 1397 } 1398 1399 if (stats == null || stats.length != cluster.numServers) { 1400 stats = new double[cluster.numServers]; 1401 } 1402 1403 for (int i =0; i < stats.length; i++) { 1404 //Cost this server has from RegionLoad 1405 long cost = 0; 1406 1407 // for every region on this server get the rl 1408 for(int regionIndex:cluster.regionsPerServer[i]) { 1409 Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex]; 1410 1411 // Now if we found a region load get the type of cost that was requested. 1412 if (regionLoadList != null) { 1413 cost = (long) (cost + getRegionLoadCost(regionLoadList)); 1414 } 1415 } 1416 1417 // Add the total cost to the stats. 1418 stats[i] = cost; 1419 } 1420 1421 // Now return the scaled cost from data held in the stats object. 1422 return costFromArray(stats); 1423 } 1424 1425 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1426 double cost = 0; 1427 for (BalancerRegionLoad rl : regionLoadList) { 1428 cost += getCostFromRl(rl); 1429 } 1430 return cost / regionLoadList.size(); 1431 } 1432 1433 protected abstract double getCostFromRl(BalancerRegionLoad rl); 1434 } 1435 1436 /** 1437 * Class to be used for the subset of RegionLoad costs that should be treated as rates. 1438 * We do not compare about the actual rate in requests per second but rather the rate relative 1439 * to the rest of the regions. 1440 */ 1441 abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction { 1442 1443 CostFromRegionLoadAsRateFunction(Configuration conf) { 1444 super(conf); 1445 } 1446 1447 @Override 1448 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1449 double cost = 0; 1450 double previous = 0; 1451 boolean isFirst = true; 1452 for (BalancerRegionLoad rl : regionLoadList) { 1453 double current = getCostFromRl(rl); 1454 if (isFirst) { 1455 isFirst = false; 1456 } else { 1457 cost += current - previous; 1458 } 1459 previous = current; 1460 } 1461 return Math.max(0, cost / (regionLoadList.size() - 1)); 1462 } 1463 } 1464 1465 /** 1466 * Compute the cost of total number of read requests The more unbalanced the higher the 1467 * computed cost will be. This uses a rolling average of regionload. 1468 */ 1469 1470 static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1471 1472 private static final String READ_REQUEST_COST_KEY = 1473 "hbase.master.balancer.stochastic.readRequestCost"; 1474 private static final float DEFAULT_READ_REQUEST_COST = 5; 1475 1476 ReadRequestCostFunction(Configuration conf) { 1477 super(conf); 1478 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST)); 1479 } 1480 1481 @Override 1482 protected double getCostFromRl(BalancerRegionLoad rl) { 1483 return rl.getReadRequestsCount(); 1484 } 1485 } 1486 1487 /** 1488 * Compute the cost of total number of write requests. The more unbalanced the higher the 1489 * computed cost will be. This uses a rolling average of regionload. 1490 */ 1491 static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1492 1493 private static final String WRITE_REQUEST_COST_KEY = 1494 "hbase.master.balancer.stochastic.writeRequestCost"; 1495 private static final float DEFAULT_WRITE_REQUEST_COST = 5; 1496 1497 WriteRequestCostFunction(Configuration conf) { 1498 super(conf); 1499 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); 1500 } 1501 1502 @Override 1503 protected double getCostFromRl(BalancerRegionLoad rl) { 1504 return rl.getWriteRequestsCount(); 1505 } 1506 } 1507 1508 /** 1509 * A cost function for region replicas. We give a very high cost to hosting 1510 * replicas of the same region in the same host. We do not prevent the case 1511 * though, since if numReplicas > numRegionServers, we still want to keep the 1512 * replica open. 1513 */ 1514 static class RegionReplicaHostCostFunction extends CostFunction { 1515 private static final String REGION_REPLICA_HOST_COST_KEY = 1516 "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; 1517 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; 1518 1519 long maxCost = 0; 1520 long[] costsPerGroup; // group is either server, host or rack 1521 int[][] primariesOfRegionsPerGroup; 1522 1523 public RegionReplicaHostCostFunction(Configuration conf) { 1524 super(conf); 1525 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, 1526 DEFAULT_REGION_REPLICA_HOST_COST_KEY)); 1527 } 1528 1529 @Override 1530 void init(Cluster cluster) { 1531 super.init(cluster); 1532 // max cost is the case where every region replica is hosted together regardless of host 1533 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; 1534 costsPerGroup = new long[cluster.numHosts]; 1535 primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based 1536 ? cluster.primariesOfRegionsPerHost 1537 : cluster.primariesOfRegionsPerServer; 1538 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) { 1539 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]); 1540 } 1541 } 1542 1543 long getMaxCost(Cluster cluster) { 1544 if (!cluster.hasRegionReplicas) { 1545 return 0; // short circuit 1546 } 1547 // max cost is the case where every region replica is hosted together regardless of host 1548 int[] primariesOfRegions = new int[cluster.numRegions]; 1549 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0, 1550 cluster.regions.length); 1551 1552 Arrays.sort(primariesOfRegions); 1553 1554 // compute numReplicas from the sorted array 1555 return costPerGroup(primariesOfRegions); 1556 } 1557 1558 @Override 1559 boolean isNeeded() { 1560 return cluster.hasRegionReplicas; 1561 } 1562 1563 @Override 1564 double cost() { 1565 if (maxCost <= 0) { 1566 return 0; 1567 } 1568 1569 long totalCost = 0; 1570 for (int i = 0 ; i < costsPerGroup.length; i++) { 1571 totalCost += costsPerGroup[i]; 1572 } 1573 return scale(0, maxCost, totalCost); 1574 } 1575 1576 /** 1577 * For each primary region, it computes the total number of replicas in the array (numReplicas) 1578 * and returns a sum of numReplicas-1 squared. For example, if the server hosts 1579 * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it 1580 * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1). 1581 * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted 1582 * @return a sum of numReplicas-1 squared for each primary region in the group. 1583 */ 1584 protected long costPerGroup(int[] primariesOfRegions) { 1585 long cost = 0; 1586 int currentPrimary = -1; 1587 int currentPrimaryIndex = -1; 1588 // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions 1589 // sharing the same primary will have consecutive numbers in the array. 1590 for (int j = 0 ; j <= primariesOfRegions.length; j++) { 1591 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1; 1592 if (primary != currentPrimary) { // we see a new primary 1593 int numReplicas = j - currentPrimaryIndex; 1594 // square the cost 1595 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 1596 cost += (numReplicas - 1) * (numReplicas - 1); 1597 } 1598 currentPrimary = primary; 1599 currentPrimaryIndex = j; 1600 } 1601 } 1602 1603 return cost; 1604 } 1605 1606 @Override 1607 protected void regionMoved(int region, int oldServer, int newServer) { 1608 if (maxCost <= 0) { 1609 return; // no need to compute 1610 } 1611 if (cluster.multiServersPerHost) { 1612 int oldHost = cluster.serverIndexToHostIndex[oldServer]; 1613 int newHost = cluster.serverIndexToHostIndex[newServer]; 1614 if (newHost != oldHost) { 1615 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]); 1616 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]); 1617 } 1618 } else { 1619 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]); 1620 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]); 1621 } 1622 } 1623 } 1624 1625 /** 1626 * A cost function for region replicas for the rack distribution. We give a relatively high 1627 * cost to hosting replicas of the same region in the same rack. We do not prevent the case 1628 * though. 1629 */ 1630 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { 1631 private static final String REGION_REPLICA_RACK_COST_KEY = 1632 "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; 1633 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; 1634 1635 public RegionReplicaRackCostFunction(Configuration conf) { 1636 super(conf); 1637 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, 1638 DEFAULT_REGION_REPLICA_RACK_COST_KEY)); 1639 } 1640 1641 @Override 1642 void init(Cluster cluster) { 1643 this.cluster = cluster; 1644 if (cluster.numRacks <= 1) { 1645 maxCost = 0; 1646 return; // disabled for 1 rack 1647 } 1648 // max cost is the case where every region replica is hosted together regardless of rack 1649 maxCost = getMaxCost(cluster); 1650 costsPerGroup = new long[cluster.numRacks]; 1651 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) { 1652 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]); 1653 } 1654 } 1655 1656 @Override 1657 protected void regionMoved(int region, int oldServer, int newServer) { 1658 if (maxCost <= 0) { 1659 return; // no need to compute 1660 } 1661 int oldRack = cluster.serverIndexToRackIndex[oldServer]; 1662 int newRack = cluster.serverIndexToRackIndex[newServer]; 1663 if (newRack != oldRack) { 1664 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]); 1665 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]); 1666 } 1667 } 1668 } 1669 1670 /** 1671 * Compute the cost of total memstore size. The more unbalanced the higher the 1672 * computed cost will be. This uses a rolling average of regionload. 1673 */ 1674 static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction { 1675 1676 private static final String MEMSTORE_SIZE_COST_KEY = 1677 "hbase.master.balancer.stochastic.memstoreSizeCost"; 1678 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5; 1679 1680 MemStoreSizeCostFunction(Configuration conf) { 1681 super(conf); 1682 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST)); 1683 } 1684 1685 @Override 1686 protected double getCostFromRl(BalancerRegionLoad rl) { 1687 return rl.getMemStoreSizeMB(); 1688 } 1689 } 1690 /** 1691 * Compute the cost of total open storefiles size. The more unbalanced the higher the 1692 * computed cost will be. This uses a rolling average of regionload. 1693 */ 1694 static class StoreFileCostFunction extends CostFromRegionLoadFunction { 1695 1696 private static final String STOREFILE_SIZE_COST_KEY = 1697 "hbase.master.balancer.stochastic.storefileSizeCost"; 1698 private static final float DEFAULT_STOREFILE_SIZE_COST = 5; 1699 1700 StoreFileCostFunction(Configuration conf) { 1701 super(conf); 1702 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST)); 1703 } 1704 1705 @Override 1706 protected double getCostFromRl(BalancerRegionLoad rl) { 1707 return rl.getStorefileSizeMB(); 1708 } 1709 } 1710 1711 /** 1712 * A helper function to compose the attribute name from tablename and costfunction name 1713 */ 1714 public static String composeAttributeName(String tableName, String costFunctionName) { 1715 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 1716 } 1717}