001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.StringJoiner;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.BalancerDecision;
043import org.apache.hadoop.hbase.client.BalancerRejection;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RackManager;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.util.ReflectionUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
055
056/**
057 * <p>
058 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
059 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
060 * It includes costs functions to compute the cost of:
061 * </p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 * <p>
070 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
071 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
072 * scaled by their respective multipliers:
073 * </p>
074 * <ul>
075 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
076 * <li>hbase.master.balancer.stochastic.moveCost</li>
077 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
078 * <li>hbase.master.balancer.stochastic.localityCost</li>
079 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
080 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
081 * </ul>
082 * <p>
083 * You can also add custom Cost function by setting the the following configuration value:
084 * </p>
085 * <ul>
086 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
087 * </ul>
088 * <p>
089 * All custom Cost Functions needs to extends {@link CostFunction}
090 * </p>
091 * <p>
092 * In addition to the above configurations, the balancer can be tuned by the following configuration
093 * values:
094 * </p>
095 * <ul>
096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
097 * that can be moved in a single invocation of this balancer.</li>
098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 * regions is multiplied to try and get the number of times the balancer will mutate all
100 * servers.</li>
101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
102 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
103 * the above computation.</li>
104 * </ul>
105 * <p>
106 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
107 * balancer gets the full picture of all loads on the cluster.
108 * </p>
109 */
110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
111public class StochasticLoadBalancer extends BaseLoadBalancer {
112
113  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
114
115  protected static final String STEPS_PER_REGION_KEY =
116    "hbase.master.balancer.stochastic.stepsPerRegion";
117  protected static final int DEFAULT_STEPS_PER_REGION = 800;
118  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
119  protected static final int DEFAULT_MAX_STEPS = 1000000;
120  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
121  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
122  protected static final String MAX_RUNNING_TIME_KEY =
123    "hbase.master.balancer.stochastic.maxRunningTime";
124  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
125  protected static final String KEEP_REGION_LOADS =
126    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
127  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
128  private static final String TABLE_FUNCTION_SEP = "_";
129  protected static final String MIN_COST_NEED_BALANCE_KEY =
130    "hbase.master.balancer.stochastic.minCostNeedBalance";
131  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
132  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
133    "hbase.master.balancer.stochastic.additionalCostFunctions";
134  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
135
136  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
137
138  // values are defaults
139  private int maxSteps = DEFAULT_MAX_STEPS;
140  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
141  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
142  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
143  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
144  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
145  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
146
147  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
148                                              // IS2_INCONSISTENT_SYNC
149  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
150  private float sumMultiplier;
151  // to save and report costs to JMX
152  private double curOverallCost = 0d;
153  private double[] tempFunctionCosts;
154  private double[] curFunctionCosts;
155
156  // Keep locality based picker and cost function to alert them
157  // when new services are offered
158  private LocalityBasedCandidateGenerator localityCandidateGenerator;
159  private ServerLocalityCostFunction localityCost;
160  private RackLocalityCostFunction rackLocalityCost;
161  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
162  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
163
164  private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
165    new HashMap<>();
166  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
167  protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
168    Suppliers.memoizeWithExpiration(() -> {
169      List<Class<? extends CandidateGenerator>> shuffled =
170        new ArrayList<>(candidateGenerators.keySet());
171      Collections.shuffle(shuffled);
172      return shuffled;
173    }, 5, TimeUnit.SECONDS);
174
175  private final BalancerConditionals balancerConditionals = BalancerConditionals.create();
176
177  /**
178   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
179   * default MetricsBalancer
180   */
181  public StochasticLoadBalancer() {
182    super(new MetricsStochasticBalancer());
183  }
184
185  @RestrictedApi(explanation = "Should only be called in tests", link = "",
186      allowedOnPath = ".*/src/test/.*")
187  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
188    super(metricsStochasticBalancer);
189  }
190
191  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
192    Configuration conf) {
193    try {
194      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
195      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
196    } catch (NoSuchMethodException e) {
197      // will try construct with no parameter
198    }
199    return ReflectionUtils.newInstance(clazz);
200  }
201
202  private void loadCustomCostFunctions(Configuration conf) {
203    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
204
205    if (null == functionsNames) {
206      return;
207    }
208    for (String className : functionsNames) {
209      Class<? extends CostFunction> clazz;
210      try {
211        clazz = Class.forName(className).asSubclass(CostFunction.class);
212      } catch (ClassNotFoundException e) {
213        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
214        continue;
215      }
216      CostFunction func = createCostFunction(clazz, conf);
217      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
218      costFunctions.add(func);
219    }
220  }
221
222  @RestrictedApi(explanation = "Should only be called in tests", link = "",
223      allowedOnPath = ".*/src/test/.*")
224  Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
225    return this.candidateGenerators;
226  }
227
228  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
229    createCandidateGenerators(Configuration conf) {
230    balancerConditionals.setConf(conf);
231    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
232    if (balancerConditionals.isReplicaDistributionEnabled()) {
233      candidateGenerators = new HashMap<>(3);
234      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
235      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
236      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
237    } else {
238      candidateGenerators = new HashMap<>(5);
239      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
240      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
241      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
242      candidateGenerators.put(RegionReplicaCandidateGenerator.class,
243        new RegionReplicaCandidateGenerator());
244      candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
245        new RegionReplicaRackCandidateGenerator());
246    }
247    return candidateGenerators;
248  }
249
250  protected List<CostFunction> createCostFunctions(Configuration conf) {
251    List<CostFunction> costFunctions = new ArrayList<>();
252    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
253    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
254    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
255    addCostFunction(costFunctions, localityCost);
256    addCostFunction(costFunctions, rackLocalityCost);
257    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
258    addCostFunction(costFunctions, new StoreFileTableSkewCostFunction(conf));
259    addCostFunction(costFunctions, regionReplicaHostCostFunction);
260    addCostFunction(costFunctions, regionReplicaRackCostFunction);
261    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
262    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
263    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
264    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
265    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
266    return costFunctions;
267  }
268
269  @Override
270  protected void loadConf(Configuration conf) {
271    super.loadConf(conf);
272    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
273    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
274    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
275    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
276
277    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
278    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
279    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
280    localityCost = new ServerLocalityCostFunction(conf);
281    rackLocalityCost = new RackLocalityCostFunction(conf);
282
283    balancerConditionals.setConf(conf);
284    this.candidateGenerators = createCandidateGenerators(conf);
285
286    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
287    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
288    this.costFunctions = createCostFunctions(conf);
289    loadCustomCostFunctions(conf);
290
291    curFunctionCosts = new double[costFunctions.size()];
292    tempFunctionCosts = new double[costFunctions.size()];
293
294    LOG.info(
295      "Loaded config: maxSteps={}, runMaxSteps={}, stepsPerRegion={}, maxRunningTime={}, "
296        + "isByTable={}, CostFunctions={}, sum of multiplier of cost functions = {} etc.",
297      maxSteps, runMaxSteps, stepsPerRegion, maxRunningTime, isByTable,
298      Arrays.toString(getCostFunctionNames()), sumMultiplier);
299  }
300
301  @Override
302  public void updateClusterMetrics(ClusterMetrics st) {
303    super.updateClusterMetrics(st);
304    updateRegionLoad();
305
306    // update metrics size
307    try {
308      // by-table or ensemble mode
309      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
310      int functionsCount = getCostFunctionNames().length;
311
312      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
313    } catch (Exception e) {
314      LOG.error("failed to get the size of all tables", e);
315    }
316  }
317
318  private void updateBalancerTableLoadInfo(TableName tableName,
319    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
320    RegionHDFSBlockLocationFinder finder = null;
321    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
322      finder = this.regionFinder;
323    }
324    BalancerClusterState cluster =
325      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
326
327    initCosts(cluster);
328    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
329    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
330    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
331  }
332
333  @Override
334  public void
335    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
336    if (isByTable) {
337      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
338        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
339      });
340    } else {
341      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
342        toEnsumbleTableLoad(loadOfAllTable));
343    }
344  }
345
346  /**
347   * Update the number of metrics that are reported to JMX
348   */
349  @RestrictedApi(explanation = "Should only be called in tests", link = "",
350      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
351  void updateMetricsSize(int size) {
352    if (metricsBalancer instanceof MetricsStochasticBalancer) {
353      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
354    }
355  }
356
357  private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) {
358    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
359      // This check is unnecessary without replicas, or with conditional replica distribution
360      // The balancer will auto-run if conditional replica distribution candidates are available
361      return false;
362    }
363    if (c.numHosts >= c.maxReplicas) {
364      regionReplicaHostCostFunction.prepare(c);
365      double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
366      boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost);
367      if (colocatedAtHost) {
368        return true;
369      }
370      LOG.trace("No host colocation detected with host cost={}", hostCost);
371    }
372    return false;
373  }
374
375  private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) {
376    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
377      // This check is unnecessary without replicas, or with conditional replica distribution
378      // The balancer will auto-run if conditional replica distribution candidates are available
379      return false;
380    }
381    if (c.numRacks >= c.maxReplicas) {
382      regionReplicaRackCostFunction.prepare(c);
383      double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
384      boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost);
385      if (colocatedAtRack) {
386        return true;
387      }
388      LOG.trace("No rack colocation detected with rack cost={}", rackCost);
389    } else {
390      LOG.trace("Rack colocation is inevitable with fewer racks than replicas, "
391        + "so we won't bother checking");
392    }
393    return false;
394  }
395
396  private String getBalanceReason(double total, double sumMultiplier) {
397    if (total <= 0) {
398      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
399    } else if (sumMultiplier <= 0) {
400      return "sumMultiplier = " + sumMultiplier + " <= 0";
401    } else if ((total / sumMultiplier) < minCostNeedBalance) {
402      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
403        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
404    } else {
405      return "";
406    }
407  }
408
409  @RestrictedApi(explanation = "Should only be called in tests", link = "",
410      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
411  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
412    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
413    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
414      LOG.info(
415        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
416      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
417        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
418      return false;
419    }
420
421    if (areSomeRegionReplicasColocatedOnHost(cluster)) {
422      LOG.info("Running balancer because at least one server hosts replicas of the same region."
423        + " function cost={}", functionCost());
424      return true;
425    }
426
427    if (areSomeRegionReplicasColocatedOnRack(cluster)) {
428      LOG.info("Running balancer because at least one rack hosts replicas of the same region."
429        + " function cost={}", functionCost());
430      return true;
431    }
432
433    if (idleRegionServerExist(cluster)) {
434      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
435        functionCost());
436      return true;
437    }
438
439    if (
440      // table isolation is inherently incompatible with naive "sloppy server" checks
441      !balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
442    ) {
443      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
444        functionCost());
445      return true;
446    }
447
448    if (balancerConditionals.shouldRunBalancer(cluster)) {
449      LOG.info("Running balancer because conditional candidate generators have important moves");
450      return true;
451    }
452
453    double total = 0.0;
454    float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized
455    for (CostFunction c : costFunctions) {
456      if (!c.isNeeded()) {
457        LOG.trace("{} not needed", c.getClass().getSimpleName());
458        continue;
459      }
460      total += c.cost() * c.getMultiplier();
461      localSumMultiplier += c.getMultiplier();
462    }
463    sumMultiplier = localSumMultiplier;
464    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
465
466    if (balanced) {
467      final double calculatedTotal = total;
468      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
469        costFunctions);
470      LOG.info(
471        "{} - skipping load balancing because weighted average imbalance={} <= "
472          + "threshold({}) and conditionals do not have opinionated move candidates. "
473          + "If you want more aggressive balancing, either lower "
474          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
475          + "multiplier(s) of the specific cost function(s). functionCost={}",
476        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
477        minCostNeedBalance, minCostNeedBalance, functionCost());
478    } else {
479      LOG.info(
480        "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}",
481        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total,
482        minCostNeedBalance);
483    }
484    return !balanced;
485  }
486
487  @RestrictedApi(explanation = "Should only be called in tests", link = "",
488      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
489  Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
490    CandidateGenerator generator = getRandomGenerator(cluster);
491    return Pair.newPair(generator, generator.generate(cluster));
492  }
493
494  /**
495   * Select the candidate generator to use based on the cost of cost functions. The chance of
496   * selecting a candidate generator is proportional to the share of cost of all cost functions
497   * among all cost functions that benefit from it.
498   */
499  protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
500    // Prefer conditional generators if they have moves to make
501    if (balancerConditionals.isConditionalBalancingEnabled()) {
502      for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) {
503        List<RegionPlanConditionalCandidateGenerator> generators =
504          conditional.getCandidateGenerators();
505        for (RegionPlanConditionalCandidateGenerator generator : generators) {
506          if (generator.getWeight(cluster) > 0) {
507            return generator;
508          }
509        }
510      }
511    }
512
513    List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
514    List<Double> partialSums = new ArrayList<>(generatorClasses.size());
515    double sum = 0.0;
516    for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
517      double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
518      sum += weight;
519      partialSums.add(sum);
520    }
521
522    // If the sum of all weights is zero, fall back to any generator
523    if (sum == 0.0) {
524      return pickAnyGenerator(generatorClasses);
525    }
526
527    double rand = ThreadLocalRandom.current().nextDouble();
528    // Normalize partial sums so that the last one should be exactly 1.0
529    for (int i = 0; i < partialSums.size(); i++) {
530      partialSums.set(i, partialSums.get(i) / sum);
531    }
532
533    // Generate a random number and pick the first generator whose partial sum is >= rand
534    for (int i = 0; i < partialSums.size(); i++) {
535      if (rand <= partialSums.get(i)) {
536        return candidateGenerators.get(generatorClasses.get(i));
537      }
538    }
539
540    // Fallback: if for some reason we didn't return above, return any generator
541    return pickAnyGenerator(generatorClasses);
542  }
543
544  private CandidateGenerator
545    pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
546    Class<? extends CandidateGenerator> randomClass =
547      generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
548    return candidateGenerators.get(randomClass);
549  }
550
551  @RestrictedApi(explanation = "Should only be called in tests", link = "",
552      allowedOnPath = ".*/src/test/.*")
553  void setRackManager(RackManager rackManager) {
554    this.rackManager = rackManager;
555  }
556
557  private long calculateMaxSteps(BalancerClusterState cluster) {
558    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
559  }
560
561  /**
562   * Given the cluster state this will try and approach an optimal balance. This should always
563   * approach the optimal state given enough steps.
564   */
565  @Override
566  protected List<RegionPlan> balanceTable(TableName tableName,
567    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
568    // On clusters with lots of HFileLinks or lots of reference files,
569    // instantiating the storefile infos can be quite expensive.
570    // Allow turning this feature off if the locality cost is not going to
571    // be used in any computations.
572    RegionHDFSBlockLocationFinder finder = null;
573    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
574      finder = this.regionFinder;
575    }
576
577    // The clusterState that is given to this method contains the state
578    // of all the regions in the table(s) (that's true today)
579    // Keep track of servers to iterate through them.
580    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
581      rackManager, regionCacheRatioOnOldServerMap);
582
583    long startTime = EnvironmentEdgeManager.currentTime();
584    cluster.setStopRequestedAt(startTime + maxRunningTime);
585
586    initCosts(cluster);
587    balancerConditionals.loadClusterState(cluster);
588    balancerConditionals.clearConditionalWeightCaches();
589
590    float localSumMultiplier = 0;
591    for (CostFunction c : costFunctions) {
592      if (c.isNeeded()) {
593        localSumMultiplier += c.getMultiplier();
594      }
595    }
596    sumMultiplier = localSumMultiplier;
597    if (sumMultiplier <= 0) {
598      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
599        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
600      return null;
601    }
602
603    double currentCost = computeCost(cluster, Double.MAX_VALUE);
604    curOverallCost = currentCost;
605    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
606    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
607    double initCost = currentCost;
608    double newCost;
609
610    if (!needsBalance(tableName, cluster)) {
611      return null;
612    }
613
614    long computedMaxSteps;
615    if (runMaxSteps) {
616      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
617    } else {
618      long calculatedMaxSteps = calculateMaxSteps(cluster);
619      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
620      if (calculatedMaxSteps > maxSteps) {
621        LOG.warn(
622          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
623            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
624            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
625            + "(This config change does not require service restart)",
626          calculatedMaxSteps, maxSteps);
627      }
628    }
629    LOG.info(
630      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
631        + "functionCost={} computedMaxSteps={}",
632      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
633
634    final String initFunctionTotalCosts = totalCostsPerFunc();
635    // Perform a stochastic walk to see if we can get a good fit.
636    long step;
637    boolean planImprovedConditionals = false;
638    Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
639    Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
640    for (step = 0; step < computedMaxSteps; step++) {
641      Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
642      CandidateGenerator generator = nextAction.getFirst();
643      BalanceAction action = nextAction.getSecond();
644
645      if (action.getType() == BalanceAction.Type.NULL) {
646        continue;
647      }
648
649      int conditionalViolationsChange = 0;
650      boolean isViolatingConditionals = false;
651      boolean moveImprovedConditionals = false;
652      // Only check conditionals if they are enabled
653      if (balancerConditionals.isConditionalBalancingEnabled()) {
654        // Always accept a conditional generator output. Sometimes conditional generators
655        // may need to make controversial moves in order to break what would otherwise
656        // be a deadlocked situation.
657        // Otherwise, for normal moves, evaluate the action.
658        if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) {
659          conditionalViolationsChange = -1;
660        } else {
661          conditionalViolationsChange =
662            balancerConditionals.getViolationCountChange(cluster, action);
663          isViolatingConditionals = balancerConditionals.isViolating(cluster, action);
664        }
665        moveImprovedConditionals = conditionalViolationsChange < 0;
666        if (moveImprovedConditionals) {
667          planImprovedConditionals = true;
668        }
669      }
670
671      // Change state and evaluate costs
672      try {
673        cluster.doAction(action);
674      } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
675        LOG.warn(
676          "Generator {} produced invalid action! "
677            + "Debug your candidate generator as this is likely a bug, "
678            + "and may cause a balancer deadlock. {}",
679          generator.getClass().getSimpleName(), action, e);
680        continue;
681      }
682      updateCostsAndWeightsWithAction(cluster, action);
683      generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum);
684
685      newCost = computeCost(cluster, currentCost);
686
687      double costImprovement = currentCost - newCost;
688      double minimumImprovement =
689        Math.max(CostFunction.getCostEpsilon(currentCost), CostFunction.getCostEpsilon(newCost));
690      boolean costsImproved = costImprovement > minimumImprovement;
691      boolean conditionalsSimilarCostsImproved =
692        (costsImproved && conditionalViolationsChange == 0 && !isViolatingConditionals);
693      // Our first priority is to reduce conditional violations
694      // Our second priority is to reduce balancer cost
695      // change, regardless of cost change
696      if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
697        currentCost = newCost;
698        generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(),
699          Long::sum);
700
701        // save for JMX
702        curOverallCost = currentCost;
703        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
704      } else {
705        // Put things back the way they were before.
706        // TODO: undo by remembering old values
707        BalanceAction undoAction = action.undoAction();
708        cluster.doAction(undoAction);
709        updateCostsAndWeightsWithAction(cluster, undoAction);
710      }
711
712      if (cluster.isStopRequested()) {
713        break;
714      }
715    }
716    long endTime = EnvironmentEdgeManager.currentTime();
717
718    StringJoiner joiner = new StringJoiner("\n");
719    joiner.add("CandidateGenerator activity summary:");
720    generatorToStepCount.forEach((generator, count) -> {
721      long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
722      joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
723        approvals));
724    });
725    LOG.debug(joiner.toString());
726
727    metricsBalancer.balanceCluster(endTime - startTime);
728
729    if (planImprovedConditionals || (initCost > currentCost)) {
730      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
731      List<RegionPlan> plans = createRegionPlans(cluster);
732      LOG.info(
733        "Finished computing new moving plan. Computation took {} ms"
734          + " to try {} different iterations.  Found a solution that moves "
735          + "{} regions; Going from a computed imbalance of {}"
736          + " to a new imbalance of {}. funtionCost={}",
737        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
738        currentCost / sumMultiplier, functionCost());
739      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
740      return plans;
741    }
742    LOG.info(
743      "Could not find a better moving plan.  Tried {} different configurations in "
744        + "{} ms, and did not find anything with an imbalance score less than {} "
745        + "and could not improve conditional violations",
746      step, endTime - startTime, initCost / sumMultiplier);
747    return null;
748  }
749
750  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
751    List<CostFunction> costFunctions) {
752    provider.recordBalancerRejection(() -> {
753      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
754      if (costFunctions != null) {
755        for (CostFunction c : costFunctions) {
756          if (!c.isNeeded()) {
757            continue;
758          }
759          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
760        }
761      }
762      return builder.build();
763    });
764  }
765
766  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
767    double initCost, String initFunctionTotalCosts, long step) {
768    provider.recordBalancerDecision(() -> {
769      List<String> regionPlans = new ArrayList<>();
770      for (RegionPlan plan : plans) {
771        regionPlans
772          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
773            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
774      }
775      return new BalancerDecision.Builder().setInitTotalCost(initCost)
776        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
777        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
778        .setRegionPlans(regionPlans).build();
779    });
780  }
781
782  /**
783   * update costs to JMX
784   */
785  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
786    if (tableName == null) {
787      return;
788    }
789
790    // check if the metricsBalancer is MetricsStochasticBalancer before casting
791    if (metricsBalancer instanceof MetricsStochasticBalancer) {
792      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
793      // overall cost
794      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
795        "Overall cost", overall);
796
797      // each cost function
798      for (int i = 0; i < costFunctions.size(); i++) {
799        CostFunction costFunction = costFunctions.get(i);
800        String costFunctionName = costFunction.getClass().getSimpleName();
801        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
802        // TODO: cost function may need a specific description
803        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
804          "The percent of " + costFunctionName, costPercent);
805      }
806    }
807  }
808
809  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
810    float multiplier = costFunction.getMultiplier();
811    if (multiplier > 0) {
812      costFunctions.add(costFunction);
813    }
814  }
815
816  protected String functionCost() {
817    StringBuilder builder = new StringBuilder();
818    for (CostFunction c : costFunctions) {
819      builder.append(c.getClass().getSimpleName());
820      builder.append(" : (");
821      if (c.isNeeded()) {
822        builder.append("multiplier=" + c.getMultiplier());
823        builder.append(", ");
824        double cost = c.cost();
825        builder.append("imbalance=" + cost);
826        if (cost >= minCostNeedBalance) {
827          builder.append(", need balance");
828        }
829      } else {
830        builder.append("not needed");
831      }
832      builder.append("); ");
833    }
834    return builder.toString();
835  }
836
837  @RestrictedApi(explanation = "Should only be called in tests", link = "",
838      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
839  List<CostFunction> getCostFunctions() {
840    return costFunctions;
841  }
842
843  private String totalCostsPerFunc() {
844    StringBuilder builder = new StringBuilder();
845    for (CostFunction c : costFunctions) {
846      if (!c.isNeeded()) {
847        continue;
848      }
849      double cost = c.getMultiplier() * c.cost();
850      if (cost > 0.0) {
851        builder.append(" ");
852        builder.append(c.getClass().getSimpleName());
853        builder.append(" : ");
854        builder.append(cost);
855        builder.append(";");
856      }
857    }
858    if (builder.length() > 0) {
859      builder.deleteCharAt(builder.length() - 1);
860    }
861    return builder.toString();
862  }
863
864  /**
865   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
866   * state.
867   * @param cluster The state of the cluster
868   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
869   */
870  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
871    List<RegionPlan> plans = new ArrayList<>();
872    for (int regionIndex = 0; regionIndex
873        < cluster.regionIndexToServerIndex.length; regionIndex++) {
874      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
875      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
876
877      if (initialServerIndex != newServerIndex) {
878        RegionInfo region = cluster.regions[regionIndex];
879        ServerName initialServer = cluster.servers[initialServerIndex];
880        ServerName newServer = cluster.servers[newServerIndex];
881
882        if (LOG.isTraceEnabled()) {
883          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
884            + initialServer.getHostname() + " to " + newServer.getHostname());
885        }
886        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
887        plans.add(rp);
888      }
889    }
890    return plans;
891  }
892
893  /**
894   * Store the current region loads.
895   */
896  private void updateRegionLoad() {
897    // We create a new hashmap so that regions that are no longer there are removed.
898    // However we temporarily need the old loads so we can use them to keep the rolling average.
899    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
900    loads = new HashMap<>();
901
902    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
903      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
904        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
905        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
906        if (rLoads == null) {
907          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
908        } else if (rLoads.size() >= numRegionLoadsToRemember) {
909          rLoads.remove();
910        }
911        rLoads.add(new BalancerRegionLoad(rm));
912        loads.put(regionNameAsString, rLoads);
913      });
914    });
915  }
916
917  @RestrictedApi(explanation = "Should only be called in tests", link = "",
918      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
919  void initCosts(BalancerClusterState cluster) {
920    weightsOfGenerators.clear();
921    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
922      weightsOfGenerators.put(clazz, 0.0);
923    }
924    for (CostFunction c : costFunctions) {
925      c.prepare(cluster);
926      c.updateWeight(weightsOfGenerators);
927    }
928  }
929
930  /**
931   * Update both the costs of costfunctions and the weights of candidate generators
932   */
933  @RestrictedApi(explanation = "Should only be called in tests", link = "",
934      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
935  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
936    // Reset all the weights to 0
937    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
938      weightsOfGenerators.put(clazz, 0.0);
939    }
940    for (CostFunction c : costFunctions) {
941      if (c.isNeeded()) {
942        c.postAction(action);
943        c.updateWeight(weightsOfGenerators);
944      }
945    }
946  }
947
948  /**
949   * Get the names of the cost functions
950   */
951  @RestrictedApi(explanation = "Should only be called in tests", link = "",
952      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
953  String[] getCostFunctionNames() {
954    String[] ret = new String[costFunctions.size()];
955    for (int i = 0; i < costFunctions.size(); i++) {
956      CostFunction c = costFunctions.get(i);
957      ret[i] = c.getClass().getSimpleName();
958    }
959
960    return ret;
961  }
962
963  /**
964   * This is the main cost function. It will compute a cost associated with a proposed cluster
965   * state. All different costs will be combined with their multipliers to produce a double cost.
966   * @param cluster      The state of the cluster
967   * @param previousCost the previous cost. This is used as an early out.
968   * @return a double of a cost associated with the proposed cluster state. This cost is an
969   *         aggregate of all individual cost functions.
970   */
971  @RestrictedApi(explanation = "Should only be called in tests", link = "",
972      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
973  double computeCost(BalancerClusterState cluster, double previousCost) {
974    double total = 0;
975
976    for (int i = 0; i < costFunctions.size(); i++) {
977      CostFunction c = costFunctions.get(i);
978      this.tempFunctionCosts[i] = 0.0;
979
980      if (!c.isNeeded()) {
981        continue;
982      }
983
984      Float multiplier = c.getMultiplier();
985      double cost = c.cost();
986
987      this.tempFunctionCosts[i] = multiplier * cost;
988      total += this.tempFunctionCosts[i];
989
990      if (total > previousCost) {
991        break;
992      }
993    }
994
995    return total;
996  }
997
998  /**
999   * A helper function to compose the attribute name from tablename and costfunction name
1000   */
1001  static String composeAttributeName(String tableName, String costFunctionName) {
1002    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1003  }
1004}