001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayDeque; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Deque; 025import java.util.HashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Objects; 030import java.util.Random; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.RegionMetrics; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.BalancerDecision; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; 044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; 046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType; 047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; 048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; 049import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 051import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.ReflectionUtils; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059 060/** 061 * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will 062 * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the 063 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p> 064 * <ul> 065 * <li>Region Load</li> 066 * <li>Table Load</li> 067 * <li>Data Locality</li> 068 * <li>Memstore Sizes</li> 069 * <li>Storefile Sizes</li> 070 * </ul> 071 * 072 * 073 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost 074 * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are 075 * scaled by their respective multipliers:</p> 076 * 077 * <ul> 078 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 079 * <li>hbase.master.balancer.stochastic.moveCost</li> 080 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 081 * <li>hbase.master.balancer.stochastic.localityCost</li> 082 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 083 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 084 * </ul> 085 * 086 * <p>You can also add custom Cost function by setting the the following configuration value:</p> 087 * <ul> 088 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 089 * </ul> 090 * 091 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p> 092 * 093 * <p>In addition to the above configurations, the balancer can be tuned by the following 094 * configuration values:</p> 095 * <ul> 096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which 097 * controls what the max number of regions that can be moved in a single invocation of this 098 * balancer.</li> 099 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 100 * regions is multiplied to try and get the number of times the balancer will 101 * mutate all servers.</li> 102 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that 103 * the balancer will try and mutate all the servers. The balancer will use the minimum of this 104 * value and the above computation.</li> 105 * </ul> 106 * 107 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false 108 * so that the balancer gets the full picture of all loads on the cluster.</p> 109 */ 110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 111@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 112 justification="Complaint is about costFunctions not being synchronized; not end of the world") 113public class StochasticLoadBalancer extends BaseLoadBalancer { 114 115 protected static final String STEPS_PER_REGION_KEY = 116 "hbase.master.balancer.stochastic.stepsPerRegion"; 117 protected static final String MAX_STEPS_KEY = 118 "hbase.master.balancer.stochastic.maxSteps"; 119 protected static final String RUN_MAX_STEPS_KEY = 120 "hbase.master.balancer.stochastic.runMaxSteps"; 121 protected static final String MAX_RUNNING_TIME_KEY = 122 "hbase.master.balancer.stochastic.maxRunningTime"; 123 protected static final String KEEP_REGION_LOADS = 124 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 125 private static final String TABLE_FUNCTION_SEP = "_"; 126 protected static final String MIN_COST_NEED_BALANCE_KEY = 127 "hbase.master.balancer.stochastic.minCostNeedBalance"; 128 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 129 "hbase.master.balancer.stochastic.additionalCostFunctions"; 130 131 protected static final Random RANDOM = new Random(System.currentTimeMillis()); 132 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 133 134 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 135 136 // values are defaults 137 private int maxSteps = 1000000; 138 private boolean runMaxSteps = false; 139 private int stepsPerRegion = 800; 140 private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. 141 private int numRegionLoadsToRemember = 15; 142 private float minCostNeedBalance = 0.05f; 143 144 private List<CandidateGenerator> candidateGenerators; 145 private CostFromRegionLoadFunction[] regionLoadFunctions; 146 private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC 147 148 // to save and report costs to JMX 149 private Double curOverallCost = 0d; 150 private Double[] tempFunctionCosts; 151 private Double[] curFunctionCosts; 152 153 // Keep locality based picker and cost function to alert them 154 // when new services are offered 155 private LocalityBasedCandidateGenerator localityCandidateGenerator; 156 private ServerLocalityCostFunction localityCost; 157 private RackLocalityCostFunction rackLocalityCost; 158 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 159 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 160 161 /** 162 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 163 * default MetricsBalancer 164 */ 165 public StochasticLoadBalancer() { 166 super(new MetricsStochasticBalancer()); 167 } 168 169 @Override 170 public void onConfigurationChange(Configuration conf) { 171 setConf(conf); 172 } 173 174 @Override 175 public synchronized void setConf(Configuration conf) { 176 super.setConf(conf); 177 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); 178 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); 179 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); 180 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps); 181 182 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); 183 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance); 184 if (localityCandidateGenerator == null) { 185 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); 186 } 187 localityCost = new ServerLocalityCostFunction(conf, services); 188 rackLocalityCost = new RackLocalityCostFunction(conf, services); 189 190 if (this.candidateGenerators == null) { 191 candidateGenerators = Lists.newArrayList(); 192 candidateGenerators.add(new RandomCandidateGenerator()); 193 candidateGenerators.add(new LoadCandidateGenerator()); 194 candidateGenerators.add(localityCandidateGenerator); 195 candidateGenerators.add(new RegionReplicaRackCandidateGenerator()); 196 } 197 regionLoadFunctions = new CostFromRegionLoadFunction[] { 198 new ReadRequestCostFunction(conf), 199 new WriteRequestCostFunction(conf), 200 new MemStoreSizeCostFunction(conf), 201 new StoreFileCostFunction(conf) 202 }; 203 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 204 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 205 206 costFunctions = new ArrayList<>(); 207 addCostFunction(new RegionCountSkewCostFunction(conf)); 208 addCostFunction(new PrimaryRegionCountSkewCostFunction(conf)); 209 addCostFunction(new MoveCostFunction(conf)); 210 addCostFunction(localityCost); 211 addCostFunction(rackLocalityCost); 212 addCostFunction(new TableSkewCostFunction(conf)); 213 addCostFunction(regionReplicaHostCostFunction); 214 addCostFunction(regionReplicaRackCostFunction); 215 addCostFunction(regionLoadFunctions[0]); 216 addCostFunction(regionLoadFunctions[1]); 217 addCostFunction(regionLoadFunctions[2]); 218 addCostFunction(regionLoadFunctions[3]); 219 loadCustomCostFunctions(conf); 220 221 curFunctionCosts = new Double[costFunctions.size()]; 222 tempFunctionCosts = new Double[costFunctions.size()]; 223 224 boolean isBalancerDecisionRecording = getConf() 225 .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 226 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 227 if (this.namedQueueRecorder == null && isBalancerDecisionRecording) { 228 this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf()); 229 } 230 231 LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion + 232 ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" + 233 Arrays.toString(getCostFunctionNames()) + " etc."); 234 } 235 236 private void loadCustomCostFunctions(Configuration conf) { 237 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 238 239 if (null == functionsNames) { 240 return; 241 } 242 243 costFunctions.addAll(Arrays.stream(functionsNames).map(c -> { 244 Class<? extends CostFunction> klass = null; 245 try { 246 klass = (Class<? extends CostFunction>) Class.forName(c); 247 } catch (ClassNotFoundException e) { 248 LOG.warn("Cannot load class " + c + "': " + e.getMessage()); 249 } 250 if (null == klass) { 251 return null; 252 } 253 CostFunction reflected = ReflectionUtils.newInstance(klass, conf); 254 LOG.info( 255 "Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'"); 256 return reflected; 257 }).filter(Objects::nonNull).collect(Collectors.toList())); 258 } 259 260 protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) { 261 this.candidateGenerators = customCandidateGenerators; 262 } 263 264 @Override 265 protected void setSlop(Configuration conf) { 266 this.slop = conf.getFloat("hbase.regions.slop", 0.001F); 267 } 268 269 @Override 270 public synchronized void setClusterMetrics(ClusterMetrics st) { 271 super.setClusterMetrics(st); 272 updateRegionLoad(); 273 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 274 cost.setClusterMetrics(st); 275 } 276 277 // update metrics size 278 try { 279 // by-table or ensemble mode 280 int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1; 281 int functionsCount = getCostFunctionNames().length; 282 283 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 284 } catch (Exception e) { 285 LOG.error("failed to get the size of all tables", e); 286 } 287 } 288 289 /** 290 * Update the number of metrics that are reported to JMX 291 */ 292 public void updateMetricsSize(int size) { 293 if (metricsBalancer instanceof MetricsStochasticBalancer) { 294 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 295 } 296 } 297 298 @Override 299 public synchronized void setMasterServices(MasterServices masterServices) { 300 super.setMasterServices(masterServices); 301 this.localityCost.setServices(masterServices); 302 this.rackLocalityCost.setServices(masterServices); 303 this.localityCandidateGenerator.setServices(masterServices); 304 } 305 306 @Override 307 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { 308 regionReplicaHostCostFunction.init(c); 309 if (regionReplicaHostCostFunction.cost() > 0) return true; 310 regionReplicaRackCostFunction.init(c); 311 if (regionReplicaRackCostFunction.cost() > 0) return true; 312 return false; 313 } 314 315 @Override 316 protected boolean needsBalance(TableName tableName, Cluster cluster) { 317 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 318 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("Not running balancer because only " + cs.getNumServers() 321 + " active regionserver(s)"); 322 } 323 return false; 324 } 325 if (areSomeRegionReplicasColocated(cluster)) { 326 return true; 327 } 328 329 if (idleRegionServerExist(cluster)){ 330 return true; 331 } 332 333 double total = 0.0; 334 float sumMultiplier = 0.0f; 335 for (CostFunction c : costFunctions) { 336 float multiplier = c.getMultiplier(); 337 if (multiplier <= 0) { 338 LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName()); 339 continue; 340 } 341 if (!c.isNeeded()) { 342 LOG.trace("{} not needed", c.getClass().getSimpleName()); 343 continue; 344 } 345 sumMultiplier += multiplier; 346 total += c.cost() * multiplier; 347 } 348 349 boolean balanced = total <= 0 || sumMultiplier <= 0 || 350 (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance); 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}", 353 balanced ? "Skipping load balancing because balanced" : "We need to load balance", 354 isByTable ? String.format("table (%s)", tableName) : "cluster", 355 total, sumMultiplier, minCostNeedBalance); 356 if (LOG.isTraceEnabled()) { 357 LOG.trace("Balance decision detailed function costs={}", functionCost()); 358 } 359 } 360 return !balanced; 361 } 362 363 @InterfaceAudience.Private 364 Cluster.Action nextAction(Cluster cluster) { 365 return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size())) 366 .generate(cluster); 367 } 368 369 /** 370 * Given the cluster state this will try and approach an optimal balance. This 371 * should always approach the optimal state given enough steps. 372 */ 373 @Override 374 public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName, 375 List<RegionInfo>> loadOfOneTable) { 376 List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable); 377 if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 378 return plans; 379 } 380 381 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 382 if (loadOfOneTable.size() <= 2) { 383 return null; 384 } 385 loadOfOneTable = new HashMap<>(loadOfOneTable); 386 loadOfOneTable.remove(masterServerName); 387 } 388 389 // On clusters with lots of HFileLinks or lots of reference files, 390 // instantiating the storefile infos can be quite expensive. 391 // Allow turning this feature off if the locality cost is not going to 392 // be used in any computations. 393 RegionLocationFinder finder = null; 394 if ((this.localityCost != null && this.localityCost.getMultiplier() > 0) 395 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) { 396 finder = this.regionFinder; 397 } 398 399 //The clusterState that is given to this method contains the state 400 //of all the regions in the table(s) (that's true today) 401 // Keep track of servers to iterate through them. 402 Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager); 403 404 long startTime = EnvironmentEdgeManager.currentTime(); 405 406 initCosts(cluster); 407 408 if (!needsBalance(tableName, cluster)) { 409 return null; 410 } 411 412 double currentCost = computeCost(cluster, Double.MAX_VALUE); 413 curOverallCost = currentCost; 414 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 415 double initCost = currentCost; 416 double newCost; 417 418 long computedMaxSteps; 419 if (runMaxSteps) { 420 computedMaxSteps = Math.max(this.maxSteps, 421 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); 422 } else { 423 long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion * 424 (long)cluster.numServers; 425 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 426 if (calculatedMaxSteps > maxSteps) { 427 LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 428 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 429 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 430 + "(This config change does not require service restart)", calculatedMaxSteps, 431 maxSteps); 432 } 433 } 434 LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost=" 435 + functionCost() + " computedMaxSteps: " + computedMaxSteps); 436 437 final String initFunctionTotalCosts = totalCostsPerFunc(); 438 // Perform a stochastic walk to see if we can get a good fit. 439 long step; 440 441 for (step = 0; step < computedMaxSteps; step++) { 442 Cluster.Action action = nextAction(cluster); 443 444 if (action.type == Type.NULL) { 445 continue; 446 } 447 448 cluster.doAction(action); 449 updateCostsWithAction(cluster, action); 450 451 newCost = computeCost(cluster, currentCost); 452 453 // Should this be kept? 454 if (newCost < currentCost) { 455 currentCost = newCost; 456 457 // save for JMX 458 curOverallCost = currentCost; 459 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 460 } else { 461 // Put things back the way they were before. 462 // TODO: undo by remembering old values 463 Action undoAction = action.undoAction(); 464 cluster.doAction(undoAction); 465 updateCostsWithAction(cluster, undoAction); 466 } 467 468 if (EnvironmentEdgeManager.currentTime() - startTime > 469 maxRunningTime) { 470 break; 471 } 472 } 473 long endTime = EnvironmentEdgeManager.currentTime(); 474 475 metricsBalancer.balanceCluster(endTime - startTime); 476 477 // update costs metrics 478 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 479 if (initCost > currentCost) { 480 plans = createRegionPlans(cluster); 481 LOG.info("Finished computing new load balance plan. Computation took {}" + 482 " to try {} different iterations. Found a solution that moves " + 483 "{} regions; Going from a computed cost of {}" + 484 " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime), 485 step, plans.size(), initCost, currentCost); 486 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 487 return plans; 488 } 489 LOG.info("Could not find a better load balance plan. Tried {} different configurations in " + 490 "{}, and did not find anything with a computed cost less than {}", step, 491 java.time.Duration.ofMillis(endTime - startTime), initCost); 492 return null; 493 } 494 495 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 496 double initCost, String initFunctionTotalCosts, long step) { 497 if (this.namedQueueRecorder != null) { 498 List<String> regionPlans = new ArrayList<>(); 499 for (RegionPlan plan : plans) { 500 regionPlans.add( 501 "table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 502 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 503 } 504 BalancerDecision balancerDecision = 505 new BalancerDecision.Builder() 506 .setInitTotalCost(initCost) 507 .setInitialFunctionCosts(initFunctionTotalCosts) 508 .setComputedTotalCost(currentCost) 509 .setFinalFunctionCosts(totalCostsPerFunc()) 510 .setComputedSteps(step) 511 .setRegionPlans(regionPlans).build(); 512 namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision)); 513 } 514 } 515 516 /** 517 * update costs to JMX 518 */ 519 private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) { 520 if (tableName == null) return; 521 522 // check if the metricsBalancer is MetricsStochasticBalancer before casting 523 if (metricsBalancer instanceof MetricsStochasticBalancer) { 524 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 525 // overall cost 526 balancer.updateStochasticCost(tableName.getNameAsString(), 527 "Overall", "Overall cost", overall); 528 529 // each cost function 530 for (int i = 0; i < costFunctions.size(); i++) { 531 CostFunction costFunction = costFunctions.get(i); 532 String costFunctionName = costFunction.getClass().getSimpleName(); 533 Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 534 // TODO: cost function may need a specific description 535 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 536 "The percent of " + costFunctionName, costPercent); 537 } 538 } 539 } 540 541 private void addCostFunction(CostFunction costFunction) { 542 if (costFunction.getMultiplier() > 0) { 543 costFunctions.add(costFunction); 544 } 545 } 546 547 private String functionCost() { 548 StringBuilder builder = new StringBuilder(); 549 for (CostFunction c:costFunctions) { 550 builder.append(c.getClass().getSimpleName()); 551 builder.append(" : ("); 552 builder.append(c.getMultiplier()); 553 builder.append(", "); 554 builder.append(c.cost()); 555 builder.append("); "); 556 } 557 return builder.toString(); 558 } 559 560 private String totalCostsPerFunc() { 561 StringBuilder builder = new StringBuilder(); 562 for (CostFunction c : costFunctions) { 563 if (c.getMultiplier() * c.cost() > 0.0) { 564 builder.append(" "); 565 builder.append(c.getClass().getSimpleName()); 566 builder.append(" : "); 567 builder.append(c.getMultiplier() * c.cost()); 568 builder.append(";"); 569 } 570 } 571 if (builder.length() > 0) { 572 builder.deleteCharAt(builder.length() - 1); 573 } 574 return builder.toString(); 575 } 576 577 /** 578 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 579 * state. 580 * 581 * @param cluster The state of the cluster 582 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 583 */ 584 private List<RegionPlan> createRegionPlans(Cluster cluster) { 585 List<RegionPlan> plans = new LinkedList<>(); 586 for (int regionIndex = 0; 587 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { 588 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 589 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 590 591 if (initialServerIndex != newServerIndex) { 592 RegionInfo region = cluster.regions[regionIndex]; 593 ServerName initialServer = cluster.servers[initialServerIndex]; 594 ServerName newServer = cluster.servers[newServerIndex]; 595 596 if (LOG.isTraceEnabled()) { 597 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 598 + initialServer.getHostname() + " to " + newServer.getHostname()); 599 } 600 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 601 plans.add(rp); 602 } 603 } 604 return plans; 605 } 606 607 /** 608 * Store the current region loads. 609 */ 610 private synchronized void updateRegionLoad() { 611 // We create a new hashmap so that regions that are no longer there are removed. 612 // However we temporarily need the old loads so we can use them to keep the rolling average. 613 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 614 loads = new HashMap<>(); 615 616 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 617 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 618 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 619 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 620 if (rLoads == null) { 621 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 622 } else if (rLoads.size() >= numRegionLoadsToRemember) { 623 rLoads.remove(); 624 } 625 rLoads.add(new BalancerRegionLoad(rm)); 626 loads.put(regionNameAsString, rLoads); 627 }); 628 }); 629 630 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 631 cost.setLoads(loads); 632 } 633 } 634 635 protected void initCosts(Cluster cluster) { 636 for (CostFunction c:costFunctions) { 637 c.init(cluster); 638 } 639 } 640 641 protected void updateCostsWithAction(Cluster cluster, Action action) { 642 for (CostFunction c : costFunctions) { 643 c.postAction(action); 644 } 645 } 646 647 /** 648 * Get the names of the cost functions 649 */ 650 public String[] getCostFunctionNames() { 651 if (costFunctions == null) return null; 652 String[] ret = new String[costFunctions.size()]; 653 for (int i = 0; i < costFunctions.size(); i++) { 654 CostFunction c = costFunctions.get(i); 655 ret[i] = c.getClass().getSimpleName(); 656 } 657 658 return ret; 659 } 660 661 /** 662 * This is the main cost function. It will compute a cost associated with a proposed cluster 663 * state. All different costs will be combined with their multipliers to produce a double cost. 664 * 665 * @param cluster The state of the cluster 666 * @param previousCost the previous cost. This is used as an early out. 667 * @return a double of a cost associated with the proposed cluster state. This cost is an 668 * aggregate of all individual cost functions. 669 */ 670 protected double computeCost(Cluster cluster, double previousCost) { 671 double total = 0; 672 673 for (int i = 0; i < costFunctions.size(); i++) { 674 CostFunction c = costFunctions.get(i); 675 this.tempFunctionCosts[i] = 0.0; 676 677 if (c.getMultiplier() <= 0) { 678 continue; 679 } 680 681 Float multiplier = c.getMultiplier(); 682 Double cost = c.cost(); 683 684 this.tempFunctionCosts[i] = multiplier*cost; 685 total += this.tempFunctionCosts[i]; 686 687 if (total > previousCost) { 688 break; 689 } 690 } 691 692 return total; 693 } 694 695 static class RandomCandidateGenerator extends CandidateGenerator { 696 697 @Override 698 Cluster.Action generate(Cluster cluster) { 699 700 int thisServer = pickRandomServer(cluster); 701 702 // Pick the other server 703 int otherServer = pickOtherRandomServer(cluster, thisServer); 704 705 return pickRandomRegions(cluster, thisServer, otherServer); 706 } 707 } 708 709 /** 710 * Generates candidates which moves the replicas out of the rack for 711 * co-hosted region replicas in the same rack 712 */ 713 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { 714 @Override 715 Cluster.Action generate(Cluster cluster) { 716 int rackIndex = pickRandomRack(cluster); 717 if (cluster.numRacks <= 1 || rackIndex == -1) { 718 return super.generate(cluster); 719 } 720 721 int regionIndex = selectCoHostedRegionPerGroup( 722 cluster.primariesOfRegionsPerRack[rackIndex], 723 cluster.regionsPerRack[rackIndex], 724 cluster.regionIndexToPrimaryIndex); 725 726 // if there are no pairs of region replicas co-hosted, default to random generator 727 if (regionIndex == -1) { 728 // default to randompicker 729 return randomGenerator.generate(cluster); 730 } 731 732 int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; 733 int toRackIndex = pickOtherRandomRack(cluster, rackIndex); 734 735 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); 736 int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; 737 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 738 return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex); 739 } 740 } 741 742 /** 743 * Base class of StochasticLoadBalancer's Cost Functions. 744 */ 745 public abstract static class CostFunction { 746 747 private float multiplier = 0; 748 749 protected Cluster cluster; 750 751 public CostFunction(Configuration c) { 752 } 753 754 boolean isNeeded() { 755 return true; 756 } 757 float getMultiplier() { 758 return multiplier; 759 } 760 761 void setMultiplier(float m) { 762 this.multiplier = m; 763 } 764 765 /** Called once per LB invocation to give the cost function 766 * to initialize it's state, and perform any costly calculation. 767 */ 768 void init(Cluster cluster) { 769 this.cluster = cluster; 770 } 771 772 /** Called once per cluster Action to give the cost function 773 * an opportunity to update it's state. postAction() is always 774 * called at least once before cost() is called with the cluster 775 * that this action is performed on. */ 776 void postAction(Action action) { 777 switch (action.type) { 778 case NULL: break; 779 case ASSIGN_REGION: 780 AssignRegionAction ar = (AssignRegionAction) action; 781 regionMoved(ar.region, -1, ar.server); 782 break; 783 case MOVE_REGION: 784 MoveRegionAction mra = (MoveRegionAction) action; 785 regionMoved(mra.region, mra.fromServer, mra.toServer); 786 break; 787 case SWAP_REGIONS: 788 SwapRegionsAction a = (SwapRegionsAction) action; 789 regionMoved(a.fromRegion, a.fromServer, a.toServer); 790 regionMoved(a.toRegion, a.toServer, a.fromServer); 791 break; 792 default: 793 throw new RuntimeException("Uknown action:" + action.type); 794 } 795 } 796 797 protected void regionMoved(int region, int oldServer, int newServer) { 798 } 799 800 protected abstract double cost(); 801 802 @SuppressWarnings("checkstyle:linelength") 803 /** 804 * Function to compute a scaled cost using 805 * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}. 806 * It assumes that this is a zero sum set of costs. It assumes that the worst case 807 * possible is all of the elements in one region server and the rest having 0. 808 * 809 * @param stats the costs 810 * @return a scaled set of costs. 811 */ 812 protected double costFromArray(double[] stats) { 813 double totalCost = 0; 814 double total = getSum(stats); 815 816 double count = stats.length; 817 double mean = total/count; 818 819 // Compute max as if all region servers had 0 and one had the sum of all costs. This must be 820 // a zero sum cost for this to make sense. 821 double max = ((count - 1) * mean) + (total - mean); 822 823 // It's possible that there aren't enough regions to go around 824 double min; 825 if (count > total) { 826 min = ((count - total) * mean) + ((1 - mean) * total); 827 } else { 828 // Some will have 1 more than everything else. 829 int numHigh = (int) (total - (Math.floor(mean) * count)); 830 int numLow = (int) (count - numHigh); 831 832 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean))); 833 834 } 835 min = Math.max(0, min); 836 for (int i=0; i<stats.length; i++) { 837 double n = stats[i]; 838 double diff = Math.abs(mean - n); 839 totalCost += diff; 840 } 841 842 double scaled = scale(min, max, totalCost); 843 return scaled; 844 } 845 846 private double getSum(double[] stats) { 847 double total = 0; 848 for(double s:stats) { 849 total += s; 850 } 851 return total; 852 } 853 854 /** 855 * Scale the value between 0 and 1. 856 * 857 * @param min Min value 858 * @param max The Max value 859 * @param value The value to be scaled. 860 * @return The scaled value. 861 */ 862 protected double scale(double min, double max, double value) { 863 if (max <= min || value <= min) { 864 return 0; 865 } 866 if ((max - min) == 0) return 0; 867 868 return Math.max(0d, Math.min(1d, (value - min) / (max - min))); 869 } 870 } 871 872 /** 873 * Given the starting state of the regions and a potential ending state 874 * compute cost based upon the number of regions that have moved. 875 */ 876 static class MoveCostFunction extends CostFunction { 877 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; 878 private static final String MOVE_COST_OFFPEAK_KEY = 879 "hbase.master.balancer.stochastic.moveCost.offpeak"; 880 private static final String MAX_MOVES_PERCENT_KEY = 881 "hbase.master.balancer.stochastic.maxMovePercent"; 882 static final float DEFAULT_MOVE_COST = 7; 883 static final float DEFAULT_MOVE_COST_OFFPEAK = 3; 884 private static final int DEFAULT_MAX_MOVES = 600; 885 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f; 886 887 private final float maxMovesPercent; 888 private final Configuration conf; 889 890 MoveCostFunction(Configuration conf) { 891 super(conf); 892 this.conf = conf; 893 // What percent of the number of regions a single run of the balancer can move. 894 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT); 895 } 896 897 @Override 898 protected double cost() { 899 // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure 900 // that large benefits are need to overcome the cost of a move. 901 if (OffPeakHours.getInstance(conf).isOffPeakHour()) { 902 this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK)); 903 } else { 904 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST)); 905 } 906 // Try and size the max number of Moves, but always be prepared to move some. 907 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), 908 DEFAULT_MAX_MOVES); 909 910 double moveCost = cluster.numMovedRegions; 911 912 // Don't let this single balance move more than the max moves. 913 // This allows better scaling to accurately represent the actual cost of a move. 914 if (moveCost > maxMoves) { 915 return 1000000; // return a number much greater than any of the other cost 916 } 917 918 return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost); 919 } 920 } 921 922 /** 923 * Compute the cost of a potential cluster state from skew in number of 924 * regions on a cluster. 925 */ 926 static class RegionCountSkewCostFunction extends CostFunction { 927 static final String REGION_COUNT_SKEW_COST_KEY = 928 "hbase.master.balancer.stochastic.regionCountCost"; 929 static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; 930 931 private double[] stats = null; 932 933 RegionCountSkewCostFunction(Configuration conf) { 934 super(conf); 935 // Load multiplier should be the greatest as it is the most general way to balance data. 936 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 937 } 938 939 @Override 940 void init(Cluster cluster) { 941 super.init(cluster); 942 LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(), 943 cluster.numServers, cluster.numRegions); 944 if (LOG.isTraceEnabled()) { 945 for (int i =0; i < cluster.numServers; i++) { 946 LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(), 947 cluster.servers[i], cluster.regionsPerServer[i].length); 948 } 949 } 950 } 951 952 @Override 953 protected double cost() { 954 if (stats == null || stats.length != cluster.numServers) { 955 stats = new double[cluster.numServers]; 956 } 957 for (int i =0; i < cluster.numServers; i++) { 958 stats[i] = cluster.regionsPerServer[i].length; 959 } 960 return costFromArray(stats); 961 } 962 } 963 964 /** 965 * Compute the cost of a potential cluster state from skew in number of 966 * primary regions on a cluster. 967 */ 968 static class PrimaryRegionCountSkewCostFunction extends CostFunction { 969 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY = 970 "hbase.master.balancer.stochastic.primaryRegionCountCost"; 971 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500; 972 973 private double[] stats = null; 974 975 PrimaryRegionCountSkewCostFunction(Configuration conf) { 976 super(conf); 977 // Load multiplier should be the greatest as primary regions serve majority of reads/writes. 978 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, 979 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST)); 980 } 981 982 @Override 983 boolean isNeeded() { 984 return cluster.hasRegionReplicas; 985 } 986 987 @Override 988 protected double cost() { 989 if (!cluster.hasRegionReplicas) { 990 return 0; 991 } 992 if (stats == null || stats.length != cluster.numServers) { 993 stats = new double[cluster.numServers]; 994 } 995 996 for (int i = 0; i < cluster.numServers; i++) { 997 stats[i] = 0; 998 for (int regionIdx : cluster.regionsPerServer[i]) { 999 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { 1000 stats[i]++; 1001 } 1002 } 1003 } 1004 1005 return costFromArray(stats); 1006 } 1007 } 1008 1009 /** 1010 * Compute the cost of a potential cluster configuration based upon how evenly 1011 * distributed tables are. 1012 */ 1013 static class TableSkewCostFunction extends CostFunction { 1014 1015 private static final String TABLE_SKEW_COST_KEY = 1016 "hbase.master.balancer.stochastic.tableSkewCost"; 1017 private static final float DEFAULT_TABLE_SKEW_COST = 35; 1018 1019 TableSkewCostFunction(Configuration conf) { 1020 super(conf); 1021 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); 1022 } 1023 1024 @Override 1025 protected double cost() { 1026 double max = cluster.numRegions; 1027 double min = ((double) cluster.numRegions) / cluster.numServers; 1028 double value = 0; 1029 1030 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { 1031 value += cluster.numMaxRegionsPerTable[i]; 1032 } 1033 1034 return scale(min, max, value); 1035 } 1036 } 1037 1038 /** 1039 * Compute a cost of a potential cluster configuration based upon where 1040 * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located. 1041 */ 1042 static abstract class LocalityBasedCostFunction extends CostFunction { 1043 1044 private final LocalityType type; 1045 1046 private double bestLocality; // best case locality across cluster weighted by local data size 1047 private double locality; // current locality across cluster weighted by local data size 1048 1049 private MasterServices services; 1050 1051 LocalityBasedCostFunction(Configuration conf, 1052 MasterServices srv, 1053 LocalityType type, 1054 String localityCostKey, 1055 float defaultLocalityCost) { 1056 super(conf); 1057 this.type = type; 1058 this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost)); 1059 this.services = srv; 1060 this.locality = 0.0; 1061 this.bestLocality = 0.0; 1062 } 1063 1064 /** 1065 * Maps region to the current entity (server or rack) on which it is stored 1066 */ 1067 abstract int regionIndexToEntityIndex(int region); 1068 1069 public void setServices(MasterServices srvc) { 1070 this.services = srvc; 1071 } 1072 1073 @Override 1074 void init(Cluster cluster) { 1075 super.init(cluster); 1076 locality = 0.0; 1077 bestLocality = 0.0; 1078 1079 // If no master, no computation will work, so assume 0 cost 1080 if (this.services == null) { 1081 return; 1082 } 1083 1084 for (int region = 0; region < cluster.numRegions; region++) { 1085 locality += getWeightedLocality(region, regionIndexToEntityIndex(region)); 1086 bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region)); 1087 } 1088 1089 // We normalize locality to be a score between 0 and 1.0 representing how good it 1090 // is compared to how good it could be. If bestLocality is 0, assume locality is 100 1091 // (and the cost is 0) 1092 locality = bestLocality == 0 ? 1.0 : locality / bestLocality; 1093 } 1094 1095 @Override 1096 protected void regionMoved(int region, int oldServer, int newServer) { 1097 int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer]; 1098 int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer]; 1099 if (this.services == null) { 1100 return; 1101 } 1102 double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity); 1103 double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality; 1104 locality += normalizedDelta; 1105 } 1106 1107 @Override 1108 protected double cost() { 1109 return 1 - locality; 1110 } 1111 1112 private int getMostLocalEntityForRegion(int region) { 1113 return cluster.getOrComputeRegionsToMostLocalEntities(type)[region]; 1114 } 1115 1116 private double getWeightedLocality(int region, int entity) { 1117 return cluster.getOrComputeWeightedLocality(region, entity, type); 1118 } 1119 1120 } 1121 1122 static class ServerLocalityCostFunction extends LocalityBasedCostFunction { 1123 1124 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; 1125 private static final float DEFAULT_LOCALITY_COST = 25; 1126 1127 ServerLocalityCostFunction(Configuration conf, MasterServices srv) { 1128 super( 1129 conf, 1130 srv, 1131 LocalityType.SERVER, 1132 LOCALITY_COST_KEY, 1133 DEFAULT_LOCALITY_COST 1134 ); 1135 } 1136 1137 @Override 1138 int regionIndexToEntityIndex(int region) { 1139 return cluster.regionIndexToServerIndex[region]; 1140 } 1141 } 1142 1143 static class RackLocalityCostFunction extends LocalityBasedCostFunction { 1144 1145 private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost"; 1146 private static final float DEFAULT_RACK_LOCALITY_COST = 15; 1147 1148 public RackLocalityCostFunction(Configuration conf, MasterServices services) { 1149 super( 1150 conf, 1151 services, 1152 LocalityType.RACK, 1153 RACK_LOCALITY_COST_KEY, 1154 DEFAULT_RACK_LOCALITY_COST 1155 ); 1156 } 1157 1158 @Override 1159 int regionIndexToEntityIndex(int region) { 1160 return cluster.getRackForRegion(region); 1161 } 1162 } 1163 1164 /** 1165 * Base class the allows writing costs functions from rolling average of some 1166 * number from RegionLoad. 1167 */ 1168 abstract static class CostFromRegionLoadFunction extends CostFunction { 1169 1170 private ClusterMetrics clusterStatus = null; 1171 private Map<String, Deque<BalancerRegionLoad>> loads = null; 1172 private double[] stats = null; 1173 CostFromRegionLoadFunction(Configuration conf) { 1174 super(conf); 1175 } 1176 1177 void setClusterMetrics(ClusterMetrics status) { 1178 this.clusterStatus = status; 1179 } 1180 1181 void setLoads(Map<String, Deque<BalancerRegionLoad>> l) { 1182 this.loads = l; 1183 } 1184 1185 @Override 1186 protected double cost() { 1187 if (clusterStatus == null || loads == null) { 1188 return 0; 1189 } 1190 1191 if (stats == null || stats.length != cluster.numServers) { 1192 stats = new double[cluster.numServers]; 1193 } 1194 1195 for (int i =0; i < stats.length; i++) { 1196 //Cost this server has from RegionLoad 1197 long cost = 0; 1198 1199 // for every region on this server get the rl 1200 for(int regionIndex:cluster.regionsPerServer[i]) { 1201 Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex]; 1202 1203 // Now if we found a region load get the type of cost that was requested. 1204 if (regionLoadList != null) { 1205 cost = (long) (cost + getRegionLoadCost(regionLoadList)); 1206 } 1207 } 1208 1209 // Add the total cost to the stats. 1210 stats[i] = cost; 1211 } 1212 1213 // Now return the scaled cost from data held in the stats object. 1214 return costFromArray(stats); 1215 } 1216 1217 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1218 double cost = 0; 1219 for (BalancerRegionLoad rl : regionLoadList) { 1220 cost += getCostFromRl(rl); 1221 } 1222 return cost / regionLoadList.size(); 1223 } 1224 1225 protected abstract double getCostFromRl(BalancerRegionLoad rl); 1226 } 1227 1228 /** 1229 * Class to be used for the subset of RegionLoad costs that should be treated as rates. 1230 * We do not compare about the actual rate in requests per second but rather the rate relative 1231 * to the rest of the regions. 1232 */ 1233 abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction { 1234 1235 CostFromRegionLoadAsRateFunction(Configuration conf) { 1236 super(conf); 1237 } 1238 1239 @Override 1240 protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) { 1241 double cost = 0; 1242 double previous = 0; 1243 boolean isFirst = true; 1244 for (BalancerRegionLoad rl : regionLoadList) { 1245 double current = getCostFromRl(rl); 1246 if (isFirst) { 1247 isFirst = false; 1248 } else { 1249 cost += current - previous; 1250 } 1251 previous = current; 1252 } 1253 return Math.max(0, cost / (regionLoadList.size() - 1)); 1254 } 1255 } 1256 1257 /** 1258 * Compute the cost of total number of read requests The more unbalanced the higher the 1259 * computed cost will be. This uses a rolling average of regionload. 1260 */ 1261 1262 static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1263 1264 private static final String READ_REQUEST_COST_KEY = 1265 "hbase.master.balancer.stochastic.readRequestCost"; 1266 private static final float DEFAULT_READ_REQUEST_COST = 5; 1267 1268 ReadRequestCostFunction(Configuration conf) { 1269 super(conf); 1270 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST)); 1271 } 1272 1273 @Override 1274 protected double getCostFromRl(BalancerRegionLoad rl) { 1275 return rl.getReadRequestsCount(); 1276 } 1277 } 1278 1279 /** 1280 * Compute the cost of total number of write requests. The more unbalanced the higher the 1281 * computed cost will be. This uses a rolling average of regionload. 1282 */ 1283 static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction { 1284 1285 private static final String WRITE_REQUEST_COST_KEY = 1286 "hbase.master.balancer.stochastic.writeRequestCost"; 1287 private static final float DEFAULT_WRITE_REQUEST_COST = 5; 1288 1289 WriteRequestCostFunction(Configuration conf) { 1290 super(conf); 1291 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); 1292 } 1293 1294 @Override 1295 protected double getCostFromRl(BalancerRegionLoad rl) { 1296 return rl.getWriteRequestsCount(); 1297 } 1298 } 1299 1300 /** 1301 * A cost function for region replicas. We give a very high cost to hosting 1302 * replicas of the same region in the same host. We do not prevent the case 1303 * though, since if numReplicas > numRegionServers, we still want to keep the 1304 * replica open. 1305 */ 1306 static class RegionReplicaHostCostFunction extends CostFunction { 1307 private static final String REGION_REPLICA_HOST_COST_KEY = 1308 "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; 1309 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; 1310 1311 long maxCost = 0; 1312 long[] costsPerGroup; // group is either server, host or rack 1313 int[][] primariesOfRegionsPerGroup; 1314 1315 public RegionReplicaHostCostFunction(Configuration conf) { 1316 super(conf); 1317 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, 1318 DEFAULT_REGION_REPLICA_HOST_COST_KEY)); 1319 } 1320 1321 @Override 1322 void init(Cluster cluster) { 1323 super.init(cluster); 1324 // max cost is the case where every region replica is hosted together regardless of host 1325 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; 1326 costsPerGroup = new long[cluster.numHosts]; 1327 primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based 1328 ? cluster.primariesOfRegionsPerHost 1329 : cluster.primariesOfRegionsPerServer; 1330 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) { 1331 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]); 1332 } 1333 } 1334 1335 long getMaxCost(Cluster cluster) { 1336 if (!cluster.hasRegionReplicas) { 1337 return 0; // short circuit 1338 } 1339 // max cost is the case where every region replica is hosted together regardless of host 1340 int[] primariesOfRegions = new int[cluster.numRegions]; 1341 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0, 1342 cluster.regions.length); 1343 1344 Arrays.sort(primariesOfRegions); 1345 1346 // compute numReplicas from the sorted array 1347 return costPerGroup(primariesOfRegions); 1348 } 1349 1350 @Override 1351 boolean isNeeded() { 1352 return cluster.hasRegionReplicas; 1353 } 1354 1355 @Override 1356 protected double cost() { 1357 if (maxCost <= 0) { 1358 return 0; 1359 } 1360 1361 long totalCost = 0; 1362 for (int i = 0 ; i < costsPerGroup.length; i++) { 1363 totalCost += costsPerGroup[i]; 1364 } 1365 return scale(0, maxCost, totalCost); 1366 } 1367 1368 /** 1369 * For each primary region, it computes the total number of replicas in the array (numReplicas) 1370 * and returns a sum of numReplicas-1 squared. For example, if the server hosts 1371 * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it 1372 * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1). 1373 * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted 1374 * @return a sum of numReplicas-1 squared for each primary region in the group. 1375 */ 1376 protected long costPerGroup(int[] primariesOfRegions) { 1377 long cost = 0; 1378 int currentPrimary = -1; 1379 int currentPrimaryIndex = -1; 1380 // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions 1381 // sharing the same primary will have consecutive numbers in the array. 1382 for (int j = 0 ; j <= primariesOfRegions.length; j++) { 1383 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1; 1384 if (primary != currentPrimary) { // we see a new primary 1385 int numReplicas = j - currentPrimaryIndex; 1386 // square the cost 1387 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 1388 cost += (numReplicas - 1) * (numReplicas - 1); 1389 } 1390 currentPrimary = primary; 1391 currentPrimaryIndex = j; 1392 } 1393 } 1394 1395 return cost; 1396 } 1397 1398 @Override 1399 protected void regionMoved(int region, int oldServer, int newServer) { 1400 if (maxCost <= 0) { 1401 return; // no need to compute 1402 } 1403 if (cluster.multiServersPerHost) { 1404 int oldHost = cluster.serverIndexToHostIndex[oldServer]; 1405 int newHost = cluster.serverIndexToHostIndex[newServer]; 1406 if (newHost != oldHost) { 1407 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]); 1408 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]); 1409 } 1410 } else { 1411 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]); 1412 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]); 1413 } 1414 } 1415 } 1416 1417 /** 1418 * A cost function for region replicas for the rack distribution. We give a relatively high 1419 * cost to hosting replicas of the same region in the same rack. We do not prevent the case 1420 * though. 1421 */ 1422 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { 1423 private static final String REGION_REPLICA_RACK_COST_KEY = 1424 "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; 1425 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; 1426 1427 public RegionReplicaRackCostFunction(Configuration conf) { 1428 super(conf); 1429 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, 1430 DEFAULT_REGION_REPLICA_RACK_COST_KEY)); 1431 } 1432 1433 @Override 1434 void init(Cluster cluster) { 1435 this.cluster = cluster; 1436 if (cluster.numRacks <= 1) { 1437 maxCost = 0; 1438 return; // disabled for 1 rack 1439 } 1440 // max cost is the case where every region replica is hosted together regardless of rack 1441 maxCost = getMaxCost(cluster); 1442 costsPerGroup = new long[cluster.numRacks]; 1443 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) { 1444 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]); 1445 } 1446 } 1447 1448 @Override 1449 protected void regionMoved(int region, int oldServer, int newServer) { 1450 if (maxCost <= 0) { 1451 return; // no need to compute 1452 } 1453 int oldRack = cluster.serverIndexToRackIndex[oldServer]; 1454 int newRack = cluster.serverIndexToRackIndex[newServer]; 1455 if (newRack != oldRack) { 1456 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]); 1457 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]); 1458 } 1459 } 1460 } 1461 1462 /** 1463 * Compute the cost of total memstore size. The more unbalanced the higher the 1464 * computed cost will be. This uses a rolling average of regionload. 1465 */ 1466 static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction { 1467 1468 private static final String MEMSTORE_SIZE_COST_KEY = 1469 "hbase.master.balancer.stochastic.memstoreSizeCost"; 1470 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5; 1471 1472 MemStoreSizeCostFunction(Configuration conf) { 1473 super(conf); 1474 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST)); 1475 } 1476 1477 @Override 1478 protected double getCostFromRl(BalancerRegionLoad rl) { 1479 return rl.getMemStoreSizeMB(); 1480 } 1481 } 1482 /** 1483 * Compute the cost of total open storefiles size. The more unbalanced the higher the 1484 * computed cost will be. This uses a rolling average of regionload. 1485 */ 1486 static class StoreFileCostFunction extends CostFromRegionLoadFunction { 1487 1488 private static final String STOREFILE_SIZE_COST_KEY = 1489 "hbase.master.balancer.stochastic.storefileSizeCost"; 1490 private static final float DEFAULT_STOREFILE_SIZE_COST = 5; 1491 1492 StoreFileCostFunction(Configuration conf) { 1493 super(conf); 1494 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST)); 1495 } 1496 1497 @Override 1498 protected double getCostFromRl(BalancerRegionLoad rl) { 1499 return rl.getStorefileSizeMB(); 1500 } 1501 } 1502 1503 /** 1504 * A helper function to compose the attribute name from tablename and costfunction name 1505 */ 1506 public static String composeAttributeName(String tableName, String costFunctionName) { 1507 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 1508 } 1509}