001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.StringJoiner;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.BalancerDecision;
043import org.apache.hadoop.hbase.client.BalancerRejection;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RackManager;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.util.ReflectionUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
055
056/**
057 * <p>
058 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
059 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
060 * It includes costs functions to compute the cost of:
061 * </p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 * <p>
070 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
071 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
072 * scaled by their respective multipliers:
073 * </p>
074 * <ul>
075 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
076 * <li>hbase.master.balancer.stochastic.moveCost</li>
077 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
078 * <li>hbase.master.balancer.stochastic.localityCost</li>
079 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
080 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
081 * </ul>
082 * <p>
083 * You can also add custom Cost function by setting the the following configuration value:
084 * </p>
085 * <ul>
086 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
087 * </ul>
088 * <p>
089 * All custom Cost Functions needs to extends {@link CostFunction}
090 * </p>
091 * <p>
092 * In addition to the above configurations, the balancer can be tuned by the following configuration
093 * values:
094 * </p>
095 * <ul>
096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
097 * that can be moved in a single invocation of this balancer.</li>
098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 * regions is multiplied to try and get the number of times the balancer will mutate all
100 * servers.</li>
101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
102 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
103 * the above computation.</li>
104 * </ul>
105 * <p>
106 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
107 * balancer gets the full picture of all loads on the cluster.
108 * </p>
109 */
110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
111public class StochasticLoadBalancer extends BaseLoadBalancer {
112
113  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
114
115  protected static final String STEPS_PER_REGION_KEY =
116    "hbase.master.balancer.stochastic.stepsPerRegion";
117  protected static final int DEFAULT_STEPS_PER_REGION = 800;
118  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
119  protected static final int DEFAULT_MAX_STEPS = 1000000;
120  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
121  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
122  protected static final String MAX_RUNNING_TIME_KEY =
123    "hbase.master.balancer.stochastic.maxRunningTime";
124  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
125  protected static final String KEEP_REGION_LOADS =
126    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
127  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
128  private static final String TABLE_FUNCTION_SEP = "_";
129  protected static final String MIN_COST_NEED_BALANCE_KEY =
130    "hbase.master.balancer.stochastic.minCostNeedBalance";
131  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
132  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
133    "hbase.master.balancer.stochastic.additionalCostFunctions";
134  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
135
136  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
137
138  // values are defaults
139  private int maxSteps = DEFAULT_MAX_STEPS;
140  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
141  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
142  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
143  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
144  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
145  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
146
147  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
148                                              // IS2_INCONSISTENT_SYNC
149  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
150  private float sumMultiplier;
151  // to save and report costs to JMX
152  private double curOverallCost = 0d;
153  private double[] tempFunctionCosts;
154  private double[] curFunctionCosts;
155
156  // Keep locality based picker and cost function to alert them
157  // when new services are offered
158  private LocalityBasedCandidateGenerator localityCandidateGenerator;
159  private ServerLocalityCostFunction localityCost;
160  private RackLocalityCostFunction rackLocalityCost;
161  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
162  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
163
164  private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
165    new HashMap<>();
166  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
167  protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
168    Suppliers.memoizeWithExpiration(() -> {
169      List<Class<? extends CandidateGenerator>> shuffled =
170        new ArrayList<>(candidateGenerators.keySet());
171      Collections.shuffle(shuffled);
172      return shuffled;
173    }, 5, TimeUnit.SECONDS);
174
175  private final BalancerConditionals balancerConditionals = BalancerConditionals.create();
176
177  /**
178   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
179   * default MetricsBalancer
180   */
181  public StochasticLoadBalancer() {
182    super(new MetricsStochasticBalancer());
183  }
184
185  @RestrictedApi(explanation = "Should only be called in tests", link = "",
186      allowedOnPath = ".*/src/test/.*")
187  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
188    super(metricsStochasticBalancer);
189  }
190
191  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
192    Configuration conf) {
193    try {
194      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
195      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
196    } catch (NoSuchMethodException e) {
197      // will try construct with no parameter
198    }
199    return ReflectionUtils.newInstance(clazz);
200  }
201
202  private void loadCustomCostFunctions(Configuration conf) {
203    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
204
205    if (null == functionsNames) {
206      return;
207    }
208    for (String className : functionsNames) {
209      Class<? extends CostFunction> clazz;
210      try {
211        clazz = Class.forName(className).asSubclass(CostFunction.class);
212      } catch (ClassNotFoundException e) {
213        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
214        continue;
215      }
216      CostFunction func = createCostFunction(clazz, conf);
217      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
218      costFunctions.add(func);
219    }
220  }
221
222  @RestrictedApi(explanation = "Should only be called in tests", link = "",
223      allowedOnPath = ".*/src/test/.*")
224  Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
225    return this.candidateGenerators;
226  }
227
228  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
229    createCandidateGenerators(Configuration conf) {
230    balancerConditionals.setConf(conf);
231    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
232    if (balancerConditionals.isReplicaDistributionEnabled()) {
233      candidateGenerators = new HashMap<>(3);
234      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
235      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
236      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
237    } else {
238      candidateGenerators = new HashMap<>(5);
239      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
240      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
241      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
242      candidateGenerators.put(RegionReplicaCandidateGenerator.class,
243        new RegionReplicaCandidateGenerator());
244      candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
245        new RegionReplicaRackCandidateGenerator());
246    }
247    return candidateGenerators;
248  }
249
250  protected List<CostFunction> createCostFunctions(Configuration conf) {
251    List<CostFunction> costFunctions = new ArrayList<>();
252    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
253    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
254    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
255    addCostFunction(costFunctions, localityCost);
256    addCostFunction(costFunctions, rackLocalityCost);
257    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
258    addCostFunction(costFunctions, new StoreFileTableSkewCostFunction(conf));
259    addCostFunction(costFunctions, regionReplicaHostCostFunction);
260    addCostFunction(costFunctions, regionReplicaRackCostFunction);
261    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
262    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
263    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
264    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
265    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
266    return costFunctions;
267  }
268
269  @Override
270  protected void loadConf(Configuration conf) {
271    super.loadConf(conf);
272    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
273    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
274    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
275    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
276
277    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
278    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
279    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
280    localityCost = new ServerLocalityCostFunction(conf);
281    rackLocalityCost = new RackLocalityCostFunction(conf);
282
283    balancerConditionals.setConf(conf);
284    this.candidateGenerators = createCandidateGenerators(conf);
285
286    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
287    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
288    this.costFunctions = createCostFunctions(conf);
289    loadCustomCostFunctions(conf);
290
291    curFunctionCosts = new double[costFunctions.size()];
292    tempFunctionCosts = new double[costFunctions.size()];
293
294    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
295      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
296      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
297      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
298  }
299
300  @Override
301  public void updateClusterMetrics(ClusterMetrics st) {
302    super.updateClusterMetrics(st);
303    updateRegionLoad();
304
305    // update metrics size
306    try {
307      // by-table or ensemble mode
308      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
309      int functionsCount = getCostFunctionNames().length;
310
311      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
312    } catch (Exception e) {
313      LOG.error("failed to get the size of all tables", e);
314    }
315  }
316
317  private void updateBalancerTableLoadInfo(TableName tableName,
318    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
319    RegionHDFSBlockLocationFinder finder = null;
320    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
321      finder = this.regionFinder;
322    }
323    BalancerClusterState cluster =
324      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
325
326    initCosts(cluster);
327    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
328    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
329    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
330  }
331
332  @Override
333  public void
334    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
335    if (isByTable) {
336      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
337        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
338      });
339    } else {
340      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
341        toEnsumbleTableLoad(loadOfAllTable));
342    }
343  }
344
345  /**
346   * Update the number of metrics that are reported to JMX
347   */
348  @RestrictedApi(explanation = "Should only be called in tests", link = "",
349      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
350  void updateMetricsSize(int size) {
351    if (metricsBalancer instanceof MetricsStochasticBalancer) {
352      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
353    }
354  }
355
356  private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) {
357    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
358      // This check is unnecessary without replicas, or with conditional replica distribution
359      // The balancer will auto-run if conditional replica distribution candidates are available
360      return false;
361    }
362    if (c.numHosts >= c.maxReplicas) {
363      regionReplicaHostCostFunction.prepare(c);
364      double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
365      boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost);
366      if (colocatedAtHost) {
367        return true;
368      }
369      LOG.trace("No host colocation detected with host cost={}", hostCost);
370    }
371    return false;
372  }
373
374  private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) {
375    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
376      // This check is unnecessary without replicas, or with conditional replica distribution
377      // The balancer will auto-run if conditional replica distribution candidates are available
378      return false;
379    }
380    if (c.numRacks >= c.maxReplicas) {
381      regionReplicaRackCostFunction.prepare(c);
382      double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
383      boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost);
384      if (colocatedAtRack) {
385        return true;
386      }
387      LOG.trace("No rack colocation detected with rack cost={}", rackCost);
388    } else {
389      LOG.trace("Rack colocation is inevitable with fewer racks than replicas, "
390        + "so we won't bother checking");
391    }
392    return false;
393  }
394
395  private String getBalanceReason(double total, double sumMultiplier) {
396    if (total <= 0) {
397      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
398    } else if (sumMultiplier <= 0) {
399      return "sumMultiplier = " + sumMultiplier + " <= 0";
400    } else if ((total / sumMultiplier) < minCostNeedBalance) {
401      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
402        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
403    } else {
404      return "";
405    }
406  }
407
408  @RestrictedApi(explanation = "Should only be called in tests", link = "",
409      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
410  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
411    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
412    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
413      LOG.info(
414        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
415      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
416        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
417      return false;
418    }
419
420    if (areSomeRegionReplicasColocatedOnHost(cluster)) {
421      LOG.info("Running balancer because at least one server hosts replicas of the same region."
422        + " function cost={}", functionCost());
423      return true;
424    }
425
426    if (areSomeRegionReplicasColocatedOnRack(cluster)) {
427      LOG.info("Running balancer because at least one rack hosts replicas of the same region."
428        + " function cost={}", functionCost());
429      return true;
430    }
431
432    if (idleRegionServerExist(cluster)) {
433      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
434        functionCost());
435      return true;
436    }
437
438    if (
439      // table isolation is inherently incompatible with naive "sloppy server" checks
440      !balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
441    ) {
442      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
443        functionCost());
444      return true;
445    }
446
447    if (balancerConditionals.shouldRunBalancer(cluster)) {
448      LOG.info("Running balancer because conditional candidate generators have important moves");
449      return true;
450    }
451
452    double total = 0.0;
453    float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized
454    for (CostFunction c : costFunctions) {
455      if (!c.isNeeded()) {
456        LOG.trace("{} not needed", c.getClass().getSimpleName());
457        continue;
458      }
459      total += c.cost() * c.getMultiplier();
460      localSumMultiplier += c.getMultiplier();
461    }
462    sumMultiplier = localSumMultiplier;
463    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
464
465    if (balanced) {
466      final double calculatedTotal = total;
467      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
468        costFunctions);
469      LOG.info(
470        "{} - skipping load balancing because weighted average imbalance={} <= "
471          + "threshold({}) and conditionals do not have opinionated move candidates. "
472          + "If you want more aggressive balancing, either lower "
473          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
474          + "multiplier(s) of the specific cost function(s). functionCost={}",
475        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
476        minCostNeedBalance, minCostNeedBalance, functionCost());
477    } else {
478      LOG.info(
479        "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}",
480        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total,
481        minCostNeedBalance);
482    }
483    return !balanced;
484  }
485
486  @RestrictedApi(explanation = "Should only be called in tests", link = "",
487      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
488  Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
489    CandidateGenerator generator = getRandomGenerator(cluster);
490    return Pair.newPair(generator, generator.generate(cluster));
491  }
492
493  /**
494   * Select the candidate generator to use based on the cost of cost functions. The chance of
495   * selecting a candidate generator is proportional to the share of cost of all cost functions
496   * among all cost functions that benefit from it.
497   */
498  protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
499    // Prefer conditional generators if they have moves to make
500    if (balancerConditionals.isConditionalBalancingEnabled()) {
501      for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) {
502        List<RegionPlanConditionalCandidateGenerator> generators =
503          conditional.getCandidateGenerators();
504        for (RegionPlanConditionalCandidateGenerator generator : generators) {
505          if (generator.getWeight(cluster) > 0) {
506            return generator;
507          }
508        }
509      }
510    }
511
512    List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
513    List<Double> partialSums = new ArrayList<>(generatorClasses.size());
514    double sum = 0.0;
515    for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
516      double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
517      sum += weight;
518      partialSums.add(sum);
519    }
520
521    // If the sum of all weights is zero, fall back to any generator
522    if (sum == 0.0) {
523      return pickAnyGenerator(generatorClasses);
524    }
525
526    double rand = ThreadLocalRandom.current().nextDouble();
527    // Normalize partial sums so that the last one should be exactly 1.0
528    for (int i = 0; i < partialSums.size(); i++) {
529      partialSums.set(i, partialSums.get(i) / sum);
530    }
531
532    // Generate a random number and pick the first generator whose partial sum is >= rand
533    for (int i = 0; i < partialSums.size(); i++) {
534      if (rand <= partialSums.get(i)) {
535        return candidateGenerators.get(generatorClasses.get(i));
536      }
537    }
538
539    // Fallback: if for some reason we didn't return above, return any generator
540    return pickAnyGenerator(generatorClasses);
541  }
542
543  private CandidateGenerator
544    pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
545    Class<? extends CandidateGenerator> randomClass =
546      generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
547    return candidateGenerators.get(randomClass);
548  }
549
550  @RestrictedApi(explanation = "Should only be called in tests", link = "",
551      allowedOnPath = ".*/src/test/.*")
552  void setRackManager(RackManager rackManager) {
553    this.rackManager = rackManager;
554  }
555
556  private long calculateMaxSteps(BalancerClusterState cluster) {
557    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
558  }
559
560  /**
561   * Given the cluster state this will try and approach an optimal balance. This should always
562   * approach the optimal state given enough steps.
563   */
564  @Override
565  protected List<RegionPlan> balanceTable(TableName tableName,
566    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
567    // On clusters with lots of HFileLinks or lots of reference files,
568    // instantiating the storefile infos can be quite expensive.
569    // Allow turning this feature off if the locality cost is not going to
570    // be used in any computations.
571    RegionHDFSBlockLocationFinder finder = null;
572    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
573      finder = this.regionFinder;
574    }
575
576    // The clusterState that is given to this method contains the state
577    // of all the regions in the table(s) (that's true today)
578    // Keep track of servers to iterate through them.
579    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
580      rackManager, regionCacheRatioOnOldServerMap);
581
582    long startTime = EnvironmentEdgeManager.currentTime();
583    cluster.setStopRequestedAt(startTime + maxRunningTime);
584
585    initCosts(cluster);
586    balancerConditionals.loadClusterState(cluster);
587    balancerConditionals.clearConditionalWeightCaches();
588
589    float localSumMultiplier = 0;
590    for (CostFunction c : costFunctions) {
591      if (c.isNeeded()) {
592        localSumMultiplier += c.getMultiplier();
593      }
594    }
595    sumMultiplier = localSumMultiplier;
596    if (sumMultiplier <= 0) {
597      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
598        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
599      return null;
600    }
601
602    double currentCost = computeCost(cluster, Double.MAX_VALUE);
603    curOverallCost = currentCost;
604    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
605    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
606    double initCost = currentCost;
607    double newCost;
608
609    if (!needsBalance(tableName, cluster)) {
610      return null;
611    }
612
613    long computedMaxSteps;
614    if (runMaxSteps) {
615      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
616    } else {
617      long calculatedMaxSteps = calculateMaxSteps(cluster);
618      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
619      if (calculatedMaxSteps > maxSteps) {
620        LOG.warn(
621          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
622            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
623            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
624            + "(This config change does not require service restart)",
625          calculatedMaxSteps, maxSteps);
626      }
627    }
628    LOG.info(
629      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
630        + "functionCost={} computedMaxSteps={}",
631      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
632
633    final String initFunctionTotalCosts = totalCostsPerFunc();
634    // Perform a stochastic walk to see if we can get a good fit.
635    long step;
636    boolean planImprovedConditionals = false;
637    Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
638    Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
639    for (step = 0; step < computedMaxSteps; step++) {
640      Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
641      CandidateGenerator generator = nextAction.getFirst();
642      BalanceAction action = nextAction.getSecond();
643
644      if (action.getType() == BalanceAction.Type.NULL) {
645        continue;
646      }
647
648      int conditionalViolationsChange = 0;
649      boolean isViolatingConditionals = false;
650      boolean moveImprovedConditionals = false;
651      // Only check conditionals if they are enabled
652      if (balancerConditionals.isConditionalBalancingEnabled()) {
653        // Always accept a conditional generator output. Sometimes conditional generators
654        // may need to make controversial moves in order to break what would otherwise
655        // be a deadlocked situation.
656        // Otherwise, for normal moves, evaluate the action.
657        if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) {
658          conditionalViolationsChange = -1;
659        } else {
660          conditionalViolationsChange =
661            balancerConditionals.getViolationCountChange(cluster, action);
662          isViolatingConditionals = balancerConditionals.isViolating(cluster, action);
663        }
664        moveImprovedConditionals = conditionalViolationsChange < 0;
665        if (moveImprovedConditionals) {
666          planImprovedConditionals = true;
667        }
668      }
669
670      // Change state and evaluate costs
671      try {
672        cluster.doAction(action);
673      } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
674        LOG.warn(
675          "Generator {} produced invalid action! "
676            + "Debug your candidate generator as this is likely a bug, "
677            + "and may cause a balancer deadlock. {}",
678          generator.getClass().getSimpleName(), action, e);
679        continue;
680      }
681      updateCostsAndWeightsWithAction(cluster, action);
682      generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum);
683
684      newCost = computeCost(cluster, currentCost);
685
686      double costImprovement = currentCost - newCost;
687      double minimumImprovement =
688        Math.max(CostFunction.getCostEpsilon(currentCost), CostFunction.getCostEpsilon(newCost));
689      boolean costsImproved = costImprovement > minimumImprovement;
690      boolean conditionalsSimilarCostsImproved =
691        (costsImproved && conditionalViolationsChange == 0 && !isViolatingConditionals);
692      // Our first priority is to reduce conditional violations
693      // Our second priority is to reduce balancer cost
694      // change, regardless of cost change
695      if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
696        currentCost = newCost;
697        generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(),
698          Long::sum);
699
700        // save for JMX
701        curOverallCost = currentCost;
702        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
703      } else {
704        // Put things back the way they were before.
705        // TODO: undo by remembering old values
706        BalanceAction undoAction = action.undoAction();
707        cluster.doAction(undoAction);
708        updateCostsAndWeightsWithAction(cluster, undoAction);
709      }
710
711      if (cluster.isStopRequested()) {
712        break;
713      }
714    }
715    long endTime = EnvironmentEdgeManager.currentTime();
716
717    StringJoiner joiner = new StringJoiner("\n");
718    joiner.add("CandidateGenerator activity summary:");
719    generatorToStepCount.forEach((generator, count) -> {
720      long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
721      joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
722        approvals));
723    });
724    LOG.debug(joiner.toString());
725
726    metricsBalancer.balanceCluster(endTime - startTime);
727
728    if (planImprovedConditionals || (initCost > currentCost)) {
729      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
730      List<RegionPlan> plans = createRegionPlans(cluster);
731      LOG.info(
732        "Finished computing new moving plan. Computation took {} ms"
733          + " to try {} different iterations.  Found a solution that moves "
734          + "{} regions; Going from a computed imbalance of {}"
735          + " to a new imbalance of {}. funtionCost={}",
736        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
737        currentCost / sumMultiplier, functionCost());
738      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
739      return plans;
740    }
741    LOG.info(
742      "Could not find a better moving plan.  Tried {} different configurations in "
743        + "{} ms, and did not find anything with an imbalance score less than {} "
744        + "and could not improve conditional violations",
745      step, endTime - startTime, initCost / sumMultiplier);
746    return null;
747  }
748
749  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
750    List<CostFunction> costFunctions) {
751    provider.recordBalancerRejection(() -> {
752      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
753      if (costFunctions != null) {
754        for (CostFunction c : costFunctions) {
755          if (!c.isNeeded()) {
756            continue;
757          }
758          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
759        }
760      }
761      return builder.build();
762    });
763  }
764
765  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
766    double initCost, String initFunctionTotalCosts, long step) {
767    provider.recordBalancerDecision(() -> {
768      List<String> regionPlans = new ArrayList<>();
769      for (RegionPlan plan : plans) {
770        regionPlans
771          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
772            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
773      }
774      return new BalancerDecision.Builder().setInitTotalCost(initCost)
775        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
776        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
777        .setRegionPlans(regionPlans).build();
778    });
779  }
780
781  /**
782   * update costs to JMX
783   */
784  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
785    if (tableName == null) {
786      return;
787    }
788
789    // check if the metricsBalancer is MetricsStochasticBalancer before casting
790    if (metricsBalancer instanceof MetricsStochasticBalancer) {
791      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
792      // overall cost
793      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
794        "Overall cost", overall);
795
796      // each cost function
797      for (int i = 0; i < costFunctions.size(); i++) {
798        CostFunction costFunction = costFunctions.get(i);
799        String costFunctionName = costFunction.getClass().getSimpleName();
800        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
801        // TODO: cost function may need a specific description
802        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
803          "The percent of " + costFunctionName, costPercent);
804      }
805    }
806  }
807
808  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
809    float multiplier = costFunction.getMultiplier();
810    if (multiplier > 0) {
811      costFunctions.add(costFunction);
812    }
813  }
814
815  protected String functionCost() {
816    StringBuilder builder = new StringBuilder();
817    for (CostFunction c : costFunctions) {
818      builder.append(c.getClass().getSimpleName());
819      builder.append(" : (");
820      if (c.isNeeded()) {
821        builder.append("multiplier=" + c.getMultiplier());
822        builder.append(", ");
823        double cost = c.cost();
824        builder.append("imbalance=" + cost);
825        if (cost >= minCostNeedBalance) {
826          builder.append(", need balance");
827        }
828      } else {
829        builder.append("not needed");
830      }
831      builder.append("); ");
832    }
833    return builder.toString();
834  }
835
836  @RestrictedApi(explanation = "Should only be called in tests", link = "",
837      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
838  List<CostFunction> getCostFunctions() {
839    return costFunctions;
840  }
841
842  private String totalCostsPerFunc() {
843    StringBuilder builder = new StringBuilder();
844    for (CostFunction c : costFunctions) {
845      if (!c.isNeeded()) {
846        continue;
847      }
848      double cost = c.getMultiplier() * c.cost();
849      if (cost > 0.0) {
850        builder.append(" ");
851        builder.append(c.getClass().getSimpleName());
852        builder.append(" : ");
853        builder.append(cost);
854        builder.append(";");
855      }
856    }
857    if (builder.length() > 0) {
858      builder.deleteCharAt(builder.length() - 1);
859    }
860    return builder.toString();
861  }
862
863  /**
864   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
865   * state.
866   * @param cluster The state of the cluster
867   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
868   */
869  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
870    List<RegionPlan> plans = new ArrayList<>();
871    for (int regionIndex = 0; regionIndex
872        < cluster.regionIndexToServerIndex.length; regionIndex++) {
873      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
874      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
875
876      if (initialServerIndex != newServerIndex) {
877        RegionInfo region = cluster.regions[regionIndex];
878        ServerName initialServer = cluster.servers[initialServerIndex];
879        ServerName newServer = cluster.servers[newServerIndex];
880
881        if (LOG.isTraceEnabled()) {
882          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
883            + initialServer.getHostname() + " to " + newServer.getHostname());
884        }
885        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
886        plans.add(rp);
887      }
888    }
889    return plans;
890  }
891
892  /**
893   * Store the current region loads.
894   */
895  private void updateRegionLoad() {
896    // We create a new hashmap so that regions that are no longer there are removed.
897    // However we temporarily need the old loads so we can use them to keep the rolling average.
898    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
899    loads = new HashMap<>();
900
901    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
902      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
903        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
904        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
905        if (rLoads == null) {
906          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
907        } else if (rLoads.size() >= numRegionLoadsToRemember) {
908          rLoads.remove();
909        }
910        rLoads.add(new BalancerRegionLoad(rm));
911        loads.put(regionNameAsString, rLoads);
912      });
913    });
914  }
915
916  @RestrictedApi(explanation = "Should only be called in tests", link = "",
917      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
918  void initCosts(BalancerClusterState cluster) {
919    weightsOfGenerators.clear();
920    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
921      weightsOfGenerators.put(clazz, 0.0);
922    }
923    for (CostFunction c : costFunctions) {
924      c.prepare(cluster);
925      c.updateWeight(weightsOfGenerators);
926    }
927  }
928
929  /**
930   * Update both the costs of costfunctions and the weights of candidate generators
931   */
932  @RestrictedApi(explanation = "Should only be called in tests", link = "",
933      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
934  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
935    // Reset all the weights to 0
936    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
937      weightsOfGenerators.put(clazz, 0.0);
938    }
939    for (CostFunction c : costFunctions) {
940      if (c.isNeeded()) {
941        c.postAction(action);
942        c.updateWeight(weightsOfGenerators);
943      }
944    }
945  }
946
947  /**
948   * Get the names of the cost functions
949   */
950  @RestrictedApi(explanation = "Should only be called in tests", link = "",
951      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
952  String[] getCostFunctionNames() {
953    String[] ret = new String[costFunctions.size()];
954    for (int i = 0; i < costFunctions.size(); i++) {
955      CostFunction c = costFunctions.get(i);
956      ret[i] = c.getClass().getSimpleName();
957    }
958
959    return ret;
960  }
961
962  /**
963   * This is the main cost function. It will compute a cost associated with a proposed cluster
964   * state. All different costs will be combined with their multipliers to produce a double cost.
965   * @param cluster      The state of the cluster
966   * @param previousCost the previous cost. This is used as an early out.
967   * @return a double of a cost associated with the proposed cluster state. This cost is an
968   *         aggregate of all individual cost functions.
969   */
970  @RestrictedApi(explanation = "Should only be called in tests", link = "",
971      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
972  double computeCost(BalancerClusterState cluster, double previousCost) {
973    double total = 0;
974
975    for (int i = 0; i < costFunctions.size(); i++) {
976      CostFunction c = costFunctions.get(i);
977      this.tempFunctionCosts[i] = 0.0;
978
979      if (!c.isNeeded()) {
980        continue;
981      }
982
983      Float multiplier = c.getMultiplier();
984      double cost = c.cost();
985
986      this.tempFunctionCosts[i] = multiplier * cost;
987      total += this.tempFunctionCosts[i];
988
989      if (total > previousCost) {
990        break;
991      }
992    }
993
994    return total;
995  }
996
997  /**
998   * A helper function to compose the attribute name from tablename and costfunction name
999   */
1000  static String composeAttributeName(String tableName, String costFunctionName) {
1001    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1002  }
1003}