001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.StringJoiner;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.BalancerDecision;
043import org.apache.hadoop.hbase.client.BalancerRejection;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RackManager;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.util.ReflectionUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
055
056/**
057 * <p>
058 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
059 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
060 * It includes costs functions to compute the cost of:
061 * </p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 * <p>
070 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
071 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
072 * scaled by their respective multipliers:
073 * </p>
074 * <ul>
075 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
076 * <li>hbase.master.balancer.stochastic.moveCost</li>
077 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
078 * <li>hbase.master.balancer.stochastic.localityCost</li>
079 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
080 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
081 * </ul>
082 * <p>
083 * You can also add custom Cost function by setting the the following configuration value:
084 * </p>
085 * <ul>
086 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
087 * </ul>
088 * <p>
089 * All custom Cost Functions needs to extends {@link CostFunction}
090 * </p>
091 * <p>
092 * In addition to the above configurations, the balancer can be tuned by the following configuration
093 * values:
094 * </p>
095 * <ul>
096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
097 * that can be moved in a single invocation of this balancer.</li>
098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 * regions is multiplied to try and get the number of times the balancer will mutate all
100 * servers.</li>
101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
102 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
103 * the above computation.</li>
104 * </ul>
105 * <p>
106 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
107 * balancer gets the full picture of all loads on the cluster.
108 * </p>
109 */
110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
111public class StochasticLoadBalancer extends BaseLoadBalancer {
112
113  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
114
115  protected static final String STEPS_PER_REGION_KEY =
116    "hbase.master.balancer.stochastic.stepsPerRegion";
117  protected static final int DEFAULT_STEPS_PER_REGION = 800;
118  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
119  protected static final int DEFAULT_MAX_STEPS = 1000000;
120  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
121  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
122  protected static final String MAX_RUNNING_TIME_KEY =
123    "hbase.master.balancer.stochastic.maxRunningTime";
124  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
125  protected static final String KEEP_REGION_LOADS =
126    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
127  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
128  private static final String TABLE_FUNCTION_SEP = "_";
129  protected static final String MIN_COST_NEED_BALANCE_KEY =
130    "hbase.master.balancer.stochastic.minCostNeedBalance";
131  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
132  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
133    "hbase.master.balancer.stochastic.additionalCostFunctions";
134  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
135
136  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
137
138  // values are defaults
139  private int maxSteps = DEFAULT_MAX_STEPS;
140  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
141  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
142  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
143  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
144  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
145  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
146
147  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
148                                              // IS2_INCONSISTENT_SYNC
149  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
150  private float sumMultiplier;
151  // to save and report costs to JMX
152  private double curOverallCost = 0d;
153  private double[] tempFunctionCosts;
154  private double[] curFunctionCosts;
155
156  // Keep locality based picker and cost function to alert them
157  // when new services are offered
158  private LocalityBasedCandidateGenerator localityCandidateGenerator;
159  private ServerLocalityCostFunction localityCost;
160  private RackLocalityCostFunction rackLocalityCost;
161  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
162  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
163
164  private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
165    new HashMap<>();
166  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
167  protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
168    Suppliers.memoizeWithExpiration(() -> {
169      List<Class<? extends CandidateGenerator>> shuffled =
170        new ArrayList<>(candidateGenerators.keySet());
171      Collections.shuffle(shuffled);
172      return shuffled;
173    }, 5, TimeUnit.SECONDS);
174
175  private final BalancerConditionals balancerConditionals = BalancerConditionals.create();
176
177  /**
178   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
179   * default MetricsBalancer
180   */
181  public StochasticLoadBalancer() {
182    super(new MetricsStochasticBalancer());
183  }
184
185  @RestrictedApi(explanation = "Should only be called in tests", link = "",
186      allowedOnPath = ".*/src/test/.*")
187  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
188    super(metricsStochasticBalancer);
189  }
190
191  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
192    Configuration conf) {
193    try {
194      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
195      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
196    } catch (NoSuchMethodException e) {
197      // will try construct with no parameter
198    }
199    return ReflectionUtils.newInstance(clazz);
200  }
201
202  private void loadCustomCostFunctions(Configuration conf) {
203    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
204
205    if (null == functionsNames) {
206      return;
207    }
208    for (String className : functionsNames) {
209      Class<? extends CostFunction> clazz;
210      try {
211        clazz = Class.forName(className).asSubclass(CostFunction.class);
212      } catch (ClassNotFoundException e) {
213        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
214        continue;
215      }
216      CostFunction func = createCostFunction(clazz, conf);
217      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
218      costFunctions.add(func);
219    }
220  }
221
222  @RestrictedApi(explanation = "Should only be called in tests", link = "",
223      allowedOnPath = ".*/src/test/.*")
224  Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
225    return this.candidateGenerators;
226  }
227
228  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
229    createCandidateGenerators(Configuration conf) {
230    balancerConditionals.setConf(conf);
231    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
232    if (balancerConditionals.isReplicaDistributionEnabled()) {
233      candidateGenerators = new HashMap<>(3);
234      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
235      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
236      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
237    } else {
238      candidateGenerators = new HashMap<>(5);
239      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
240      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
241      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
242      candidateGenerators.put(RegionReplicaCandidateGenerator.class,
243        new RegionReplicaCandidateGenerator());
244      candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
245        new RegionReplicaRackCandidateGenerator());
246    }
247    return candidateGenerators;
248  }
249
250  protected List<CostFunction> createCostFunctions(Configuration conf) {
251    List<CostFunction> costFunctions = new ArrayList<>();
252    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
253    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
254    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
255    addCostFunction(costFunctions, localityCost);
256    addCostFunction(costFunctions, rackLocalityCost);
257    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
258    addCostFunction(costFunctions, new StoreFileTableSkewCostFunction(conf));
259    addCostFunction(costFunctions, regionReplicaHostCostFunction);
260    addCostFunction(costFunctions, regionReplicaRackCostFunction);
261    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
262    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
263    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
264    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
265    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
266    return costFunctions;
267  }
268
269  @Override
270  protected void loadConf(Configuration conf) {
271    super.loadConf(conf);
272    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
273    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
274    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
275    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
276
277    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
278    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
279    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
280    localityCost = new ServerLocalityCostFunction(conf);
281    rackLocalityCost = new RackLocalityCostFunction(conf);
282
283    balancerConditionals.setConf(conf);
284    this.candidateGenerators = createCandidateGenerators(conf);
285
286    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
287    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
288    this.costFunctions = createCostFunctions(conf);
289    loadCustomCostFunctions(conf);
290
291    curFunctionCosts = new double[costFunctions.size()];
292    tempFunctionCosts = new double[costFunctions.size()];
293
294    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
295      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
296      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
297      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
298  }
299
300  @Override
301  public void updateClusterMetrics(ClusterMetrics st) {
302    super.updateClusterMetrics(st);
303    updateRegionLoad();
304
305    // update metrics size
306    try {
307      // by-table or ensemble mode
308      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
309      int functionsCount = getCostFunctionNames().length;
310
311      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
312    } catch (Exception e) {
313      LOG.error("failed to get the size of all tables", e);
314    }
315  }
316
317  private void updateBalancerTableLoadInfo(TableName tableName,
318    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
319    RegionHDFSBlockLocationFinder finder = null;
320    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
321      finder = this.regionFinder;
322    }
323    BalancerClusterState cluster =
324      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
325
326    initCosts(cluster);
327    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
328    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
329    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
330  }
331
332  @Override
333  public void
334    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
335    if (isByTable) {
336      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
337        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
338      });
339    } else {
340      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
341        toEnsumbleTableLoad(loadOfAllTable));
342    }
343  }
344
345  /**
346   * Update the number of metrics that are reported to JMX
347   */
348  @RestrictedApi(explanation = "Should only be called in tests", link = "",
349      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
350  void updateMetricsSize(int size) {
351    if (metricsBalancer instanceof MetricsStochasticBalancer) {
352      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
353    }
354  }
355
356  private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) {
357    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
358      // This check is unnecessary without replicas, or with conditional replica distribution
359      // The balancer will auto-run if conditional replica distribution candidates are available
360      return false;
361    }
362    if (c.numHosts >= c.maxReplicas) {
363      regionReplicaHostCostFunction.prepare(c);
364      double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
365      boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost);
366      if (colocatedAtHost) {
367        return true;
368      }
369      LOG.trace("No host colocation detected with host cost={}", hostCost);
370    }
371    return false;
372  }
373
374  private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) {
375    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
376      // This check is unnecessary without replicas, or with conditional replica distribution
377      // The balancer will auto-run if conditional replica distribution candidates are available
378      return false;
379    }
380    if (c.numRacks >= c.maxReplicas) {
381      regionReplicaRackCostFunction.prepare(c);
382      double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
383      boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost);
384      if (colocatedAtRack) {
385        return true;
386      }
387      LOG.trace("No rack colocation detected with rack cost={}", rackCost);
388    } else {
389      LOG.trace("Rack colocation is inevitable with fewer racks than replicas, "
390        + "so we won't bother checking");
391    }
392    return false;
393  }
394
395  private String getBalanceReason(double total, double sumMultiplier) {
396    if (total <= 0) {
397      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
398    } else if (sumMultiplier <= 0) {
399      return "sumMultiplier = " + sumMultiplier + " <= 0";
400    } else if ((total / sumMultiplier) < minCostNeedBalance) {
401      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
402        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
403    } else {
404      return "";
405    }
406  }
407
408  @RestrictedApi(explanation = "Should only be called in tests", link = "",
409      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
410  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
411    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
412    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
413      LOG.info(
414        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
415      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
416        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
417      return false;
418    }
419
420    if (areSomeRegionReplicasColocatedOnHost(cluster)) {
421      LOG.info("Running balancer because at least one server hosts replicas of the same region."
422        + " function cost={}", functionCost());
423      return true;
424    }
425
426    if (areSomeRegionReplicasColocatedOnRack(cluster)) {
427      LOG.info("Running balancer because at least one rack hosts replicas of the same region."
428        + " function cost={}", functionCost());
429      return true;
430    }
431
432    if (idleRegionServerExist(cluster)) {
433      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
434        functionCost());
435      return true;
436    }
437
438    if (
439      // table isolation is inherently incompatible with naive "sloppy server" checks
440      !balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
441    ) {
442      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
443        functionCost());
444      return true;
445    }
446
447    if (balancerConditionals.shouldRunBalancer(cluster)) {
448      LOG.info("Running balancer because conditional candidate generators have important moves");
449      return true;
450    }
451
452    double total = 0.0;
453    float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized
454    for (CostFunction c : costFunctions) {
455      if (!c.isNeeded()) {
456        LOG.trace("{} not needed", c.getClass().getSimpleName());
457        continue;
458      }
459      total += c.cost() * c.getMultiplier();
460      localSumMultiplier += c.getMultiplier();
461    }
462    sumMultiplier = localSumMultiplier;
463    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
464
465    if (balanced) {
466      final double calculatedTotal = total;
467      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
468        costFunctions);
469      LOG.info(
470        "{} - skipping load balancing because weighted average imbalance={} <= "
471          + "threshold({}) and conditionals do not have opinionated move candidates. "
472          + "If you want more aggressive balancing, either lower "
473          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
474          + "multiplier(s) of the specific cost function(s). functionCost={}",
475        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
476        minCostNeedBalance, minCostNeedBalance, functionCost());
477    } else {
478      LOG.info(
479        "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}",
480        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total,
481        minCostNeedBalance);
482    }
483    return !balanced;
484  }
485
486  @RestrictedApi(explanation = "Should only be called in tests", link = "",
487      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
488  Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
489    CandidateGenerator generator = getRandomGenerator(cluster);
490    return Pair.newPair(generator, generator.generate(cluster));
491  }
492
493  /**
494   * Select the candidate generator to use based on the cost of cost functions. The chance of
495   * selecting a candidate generator is proportional to the share of cost of all cost functions
496   * among all cost functions that benefit from it.
497   */
498  protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
499    // Prefer conditional generators if they have moves to make
500    if (balancerConditionals.isConditionalBalancingEnabled()) {
501      for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) {
502        List<RegionPlanConditionalCandidateGenerator> generators =
503          conditional.getCandidateGenerators();
504        for (RegionPlanConditionalCandidateGenerator generator : generators) {
505          if (generator.getWeight(cluster) > 0) {
506            return generator;
507          }
508        }
509      }
510    }
511
512    List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
513    List<Double> partialSums = new ArrayList<>(generatorClasses.size());
514    double sum = 0.0;
515    for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
516      double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
517      sum += weight;
518      partialSums.add(sum);
519    }
520
521    // If the sum of all weights is zero, fall back to any generator
522    if (sum == 0.0) {
523      return pickAnyGenerator(generatorClasses);
524    }
525
526    double rand = ThreadLocalRandom.current().nextDouble();
527    // Normalize partial sums so that the last one should be exactly 1.0
528    for (int i = 0; i < partialSums.size(); i++) {
529      partialSums.set(i, partialSums.get(i) / sum);
530    }
531
532    // Generate a random number and pick the first generator whose partial sum is >= rand
533    for (int i = 0; i < partialSums.size(); i++) {
534      if (rand <= partialSums.get(i)) {
535        return candidateGenerators.get(generatorClasses.get(i));
536      }
537    }
538
539    // Fallback: if for some reason we didn't return above, return any generator
540    return pickAnyGenerator(generatorClasses);
541  }
542
543  private CandidateGenerator
544    pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
545    Class<? extends CandidateGenerator> randomClass =
546      generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
547    return candidateGenerators.get(randomClass);
548  }
549
550  @RestrictedApi(explanation = "Should only be called in tests", link = "",
551      allowedOnPath = ".*/src/test/.*")
552  void setRackManager(RackManager rackManager) {
553    this.rackManager = rackManager;
554  }
555
556  private long calculateMaxSteps(BalancerClusterState cluster) {
557    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
558  }
559
560  /**
561   * Given the cluster state this will try and approach an optimal balance. This should always
562   * approach the optimal state given enough steps.
563   */
564  @Override
565  protected List<RegionPlan> balanceTable(TableName tableName,
566    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
567    // On clusters with lots of HFileLinks or lots of reference files,
568    // instantiating the storefile infos can be quite expensive.
569    // Allow turning this feature off if the locality cost is not going to
570    // be used in any computations.
571    RegionHDFSBlockLocationFinder finder = null;
572    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
573      finder = this.regionFinder;
574    }
575
576    // The clusterState that is given to this method contains the state
577    // of all the regions in the table(s) (that's true today)
578    // Keep track of servers to iterate through them.
579    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
580      rackManager, regionCacheRatioOnOldServerMap);
581
582    long startTime = EnvironmentEdgeManager.currentTime();
583    cluster.setStopRequestedAt(startTime + maxRunningTime);
584
585    initCosts(cluster);
586    balancerConditionals.loadClusterState(cluster);
587    balancerConditionals.clearConditionalWeightCaches();
588
589    float localSumMultiplier = 0;
590    for (CostFunction c : costFunctions) {
591      if (c.isNeeded()) {
592        localSumMultiplier += c.getMultiplier();
593      }
594    }
595    sumMultiplier = localSumMultiplier;
596    if (sumMultiplier <= 0) {
597      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
598        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
599      return null;
600    }
601
602    double currentCost = computeCost(cluster, Double.MAX_VALUE);
603    curOverallCost = currentCost;
604    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
605    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
606    double initCost = currentCost;
607    double newCost;
608
609    if (!needsBalance(tableName, cluster)) {
610      return null;
611    }
612
613    long computedMaxSteps;
614    if (runMaxSteps) {
615      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
616    } else {
617      long calculatedMaxSteps = calculateMaxSteps(cluster);
618      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
619      if (calculatedMaxSteps > maxSteps) {
620        LOG.warn(
621          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
622            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
623            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
624            + "(This config change does not require service restart)",
625          calculatedMaxSteps, maxSteps);
626      }
627    }
628    LOG.info(
629      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
630        + "functionCost={} computedMaxSteps={}",
631      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
632
633    final String initFunctionTotalCosts = totalCostsPerFunc();
634    // Perform a stochastic walk to see if we can get a good fit.
635    long step;
636    boolean planImprovedConditionals = false;
637    Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
638    Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
639    for (step = 0; step < computedMaxSteps; step++) {
640      Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
641      CandidateGenerator generator = nextAction.getFirst();
642      BalanceAction action = nextAction.getSecond();
643
644      if (action.getType() == BalanceAction.Type.NULL) {
645        continue;
646      }
647
648      int conditionalViolationsChange = 0;
649      boolean isViolatingConditionals = false;
650      boolean moveImprovedConditionals = false;
651      // Only check conditionals if they are enabled
652      if (balancerConditionals.isConditionalBalancingEnabled()) {
653        // Always accept a conditional generator output. Sometimes conditional generators
654        // may need to make controversial moves in order to break what would otherwise
655        // be a deadlocked situation.
656        // Otherwise, for normal moves, evaluate the action.
657        if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) {
658          conditionalViolationsChange = -1;
659        } else {
660          conditionalViolationsChange =
661            balancerConditionals.getViolationCountChange(cluster, action);
662          isViolatingConditionals = balancerConditionals.isViolating(cluster, action);
663        }
664        moveImprovedConditionals = conditionalViolationsChange < 0;
665        if (moveImprovedConditionals) {
666          planImprovedConditionals = true;
667        }
668      }
669
670      // Change state and evaluate costs
671      try {
672        cluster.doAction(action);
673      } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
674        LOG.warn(
675          "Generator {} produced invalid action! "
676            + "Debug your candidate generator as this is likely a bug, "
677            + "and may cause a balancer deadlock. {}",
678          generator.getClass().getSimpleName(), action, e);
679        continue;
680      }
681      updateCostsAndWeightsWithAction(cluster, action);
682      generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum);
683
684      newCost = computeCost(cluster, currentCost);
685
686      boolean conditionalsSimilarCostsImproved =
687        (newCost < currentCost && conditionalViolationsChange == 0 && !isViolatingConditionals);
688      // Our first priority is to reduce conditional violations
689      // Our second priority is to reduce balancer cost
690      // change, regardless of cost change
691      if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
692        currentCost = newCost;
693        generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(),
694          Long::sum);
695
696        // save for JMX
697        curOverallCost = currentCost;
698        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
699      } else {
700        // Put things back the way they were before.
701        // TODO: undo by remembering old values
702        BalanceAction undoAction = action.undoAction();
703        cluster.doAction(undoAction);
704        updateCostsAndWeightsWithAction(cluster, undoAction);
705      }
706
707      if (cluster.isStopRequested()) {
708        break;
709      }
710    }
711    long endTime = EnvironmentEdgeManager.currentTime();
712
713    StringJoiner joiner = new StringJoiner("\n");
714    joiner.add("CandidateGenerator activity summary:");
715    generatorToStepCount.forEach((generator, count) -> {
716      long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
717      joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
718        approvals));
719    });
720    LOG.debug(joiner.toString());
721
722    metricsBalancer.balanceCluster(endTime - startTime);
723
724    if (planImprovedConditionals || (initCost > currentCost)) {
725      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
726      List<RegionPlan> plans = createRegionPlans(cluster);
727      LOG.info(
728        "Finished computing new moving plan. Computation took {} ms"
729          + " to try {} different iterations.  Found a solution that moves "
730          + "{} regions; Going from a computed imbalance of {}"
731          + " to a new imbalance of {}. funtionCost={}",
732        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
733        currentCost / sumMultiplier, functionCost());
734      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
735      return plans;
736    }
737    LOG.info(
738      "Could not find a better moving plan.  Tried {} different configurations in "
739        + "{} ms, and did not find anything with an imbalance score less than {} "
740        + "and could not improve conditional violations",
741      step, endTime - startTime, initCost / sumMultiplier);
742    return null;
743  }
744
745  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
746    List<CostFunction> costFunctions) {
747    provider.recordBalancerRejection(() -> {
748      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
749      if (costFunctions != null) {
750        for (CostFunction c : costFunctions) {
751          if (!c.isNeeded()) {
752            continue;
753          }
754          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
755        }
756      }
757      return builder.build();
758    });
759  }
760
761  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
762    double initCost, String initFunctionTotalCosts, long step) {
763    provider.recordBalancerDecision(() -> {
764      List<String> regionPlans = new ArrayList<>();
765      for (RegionPlan plan : plans) {
766        regionPlans
767          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
768            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
769      }
770      return new BalancerDecision.Builder().setInitTotalCost(initCost)
771        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
772        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
773        .setRegionPlans(regionPlans).build();
774    });
775  }
776
777  /**
778   * update costs to JMX
779   */
780  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
781    if (tableName == null) {
782      return;
783    }
784
785    // check if the metricsBalancer is MetricsStochasticBalancer before casting
786    if (metricsBalancer instanceof MetricsStochasticBalancer) {
787      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
788      // overall cost
789      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
790        "Overall cost", overall);
791
792      // each cost function
793      for (int i = 0; i < costFunctions.size(); i++) {
794        CostFunction costFunction = costFunctions.get(i);
795        String costFunctionName = costFunction.getClass().getSimpleName();
796        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
797        // TODO: cost function may need a specific description
798        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
799          "The percent of " + costFunctionName, costPercent);
800      }
801    }
802  }
803
804  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
805    float multiplier = costFunction.getMultiplier();
806    if (multiplier > 0) {
807      costFunctions.add(costFunction);
808    }
809  }
810
811  protected String functionCost() {
812    StringBuilder builder = new StringBuilder();
813    for (CostFunction c : costFunctions) {
814      builder.append(c.getClass().getSimpleName());
815      builder.append(" : (");
816      if (c.isNeeded()) {
817        builder.append("multiplier=" + c.getMultiplier());
818        builder.append(", ");
819        double cost = c.cost();
820        builder.append("imbalance=" + cost);
821        if (cost >= minCostNeedBalance) {
822          builder.append(", need balance");
823        }
824      } else {
825        builder.append("not needed");
826      }
827      builder.append("); ");
828    }
829    return builder.toString();
830  }
831
832  @RestrictedApi(explanation = "Should only be called in tests", link = "",
833      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
834  List<CostFunction> getCostFunctions() {
835    return costFunctions;
836  }
837
838  private String totalCostsPerFunc() {
839    StringBuilder builder = new StringBuilder();
840    for (CostFunction c : costFunctions) {
841      if (!c.isNeeded()) {
842        continue;
843      }
844      double cost = c.getMultiplier() * c.cost();
845      if (cost > 0.0) {
846        builder.append(" ");
847        builder.append(c.getClass().getSimpleName());
848        builder.append(" : ");
849        builder.append(cost);
850        builder.append(";");
851      }
852    }
853    if (builder.length() > 0) {
854      builder.deleteCharAt(builder.length() - 1);
855    }
856    return builder.toString();
857  }
858
859  /**
860   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
861   * state.
862   * @param cluster The state of the cluster
863   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
864   */
865  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
866    List<RegionPlan> plans = new ArrayList<>();
867    for (int regionIndex = 0; regionIndex
868        < cluster.regionIndexToServerIndex.length; regionIndex++) {
869      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
870      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
871
872      if (initialServerIndex != newServerIndex) {
873        RegionInfo region = cluster.regions[regionIndex];
874        ServerName initialServer = cluster.servers[initialServerIndex];
875        ServerName newServer = cluster.servers[newServerIndex];
876
877        if (LOG.isTraceEnabled()) {
878          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
879            + initialServer.getHostname() + " to " + newServer.getHostname());
880        }
881        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
882        plans.add(rp);
883      }
884    }
885    return plans;
886  }
887
888  /**
889   * Store the current region loads.
890   */
891  private void updateRegionLoad() {
892    // We create a new hashmap so that regions that are no longer there are removed.
893    // However we temporarily need the old loads so we can use them to keep the rolling average.
894    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
895    loads = new HashMap<>();
896
897    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
898      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
899        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
900        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
901        if (rLoads == null) {
902          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
903        } else if (rLoads.size() >= numRegionLoadsToRemember) {
904          rLoads.remove();
905        }
906        rLoads.add(new BalancerRegionLoad(rm));
907        loads.put(regionNameAsString, rLoads);
908      });
909    });
910  }
911
912  @RestrictedApi(explanation = "Should only be called in tests", link = "",
913      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
914  void initCosts(BalancerClusterState cluster) {
915    weightsOfGenerators.clear();
916    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
917      weightsOfGenerators.put(clazz, 0.0);
918    }
919    for (CostFunction c : costFunctions) {
920      c.prepare(cluster);
921      c.updateWeight(weightsOfGenerators);
922    }
923  }
924
925  /**
926   * Update both the costs of costfunctions and the weights of candidate generators
927   */
928  @RestrictedApi(explanation = "Should only be called in tests", link = "",
929      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
930  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
931    // Reset all the weights to 0
932    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
933      weightsOfGenerators.put(clazz, 0.0);
934    }
935    for (CostFunction c : costFunctions) {
936      if (c.isNeeded()) {
937        c.postAction(action);
938        c.updateWeight(weightsOfGenerators);
939      }
940    }
941  }
942
943  /**
944   * Get the names of the cost functions
945   */
946  @RestrictedApi(explanation = "Should only be called in tests", link = "",
947      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
948  String[] getCostFunctionNames() {
949    String[] ret = new String[costFunctions.size()];
950    for (int i = 0; i < costFunctions.size(); i++) {
951      CostFunction c = costFunctions.get(i);
952      ret[i] = c.getClass().getSimpleName();
953    }
954
955    return ret;
956  }
957
958  /**
959   * This is the main cost function. It will compute a cost associated with a proposed cluster
960   * state. All different costs will be combined with their multipliers to produce a double cost.
961   * @param cluster      The state of the cluster
962   * @param previousCost the previous cost. This is used as an early out.
963   * @return a double of a cost associated with the proposed cluster state. This cost is an
964   *         aggregate of all individual cost functions.
965   */
966  @RestrictedApi(explanation = "Should only be called in tests", link = "",
967      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
968  double computeCost(BalancerClusterState cluster, double previousCost) {
969    double total = 0;
970
971    for (int i = 0; i < costFunctions.size(); i++) {
972      CostFunction c = costFunctions.get(i);
973      this.tempFunctionCosts[i] = 0.0;
974
975      if (!c.isNeeded()) {
976        continue;
977      }
978
979      Float multiplier = c.getMultiplier();
980      double cost = c.cost();
981
982      this.tempFunctionCosts[i] = multiplier * cost;
983      total += this.tempFunctionCosts[i];
984
985      if (total > previousCost) {
986        break;
987      }
988    }
989
990    return total;
991  }
992
993  /**
994   * A helper function to compose the attribute name from tablename and costfunction name
995   */
996  static String composeAttributeName(String tableName, String costFunctionName) {
997    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
998  }
999}