001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.StringJoiner;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.BalancerDecision;
043import org.apache.hadoop.hbase.client.BalancerRejection;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RackManager;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.util.ReflectionUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
055
056/**
057 * <p>
058 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
059 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
060 * It includes costs functions to compute the cost of:
061 * </p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 * <p>
070 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
071 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
072 * scaled by their respective multipliers:
073 * </p>
074 * <ul>
075 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
076 * <li>hbase.master.balancer.stochastic.moveCost</li>
077 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
078 * <li>hbase.master.balancer.stochastic.localityCost</li>
079 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
080 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
081 * </ul>
082 * <p>
083 * You can also add custom Cost function by setting the the following configuration value:
084 * </p>
085 * <ul>
086 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
087 * </ul>
088 * <p>
089 * All custom Cost Functions needs to extends {@link CostFunction}
090 * </p>
091 * <p>
092 * In addition to the above configurations, the balancer can be tuned by the following configuration
093 * values:
094 * </p>
095 * <ul>
096 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
097 * that can be moved in a single invocation of this balancer.</li>
098 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 * regions is multiplied to try and get the number of times the balancer will mutate all
100 * servers.</li>
101 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
102 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
103 * the above computation.</li>
104 * </ul>
105 * <p>
106 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
107 * balancer gets the full picture of all loads on the cluster.
108 * </p>
109 */
110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
111public class StochasticLoadBalancer extends BaseLoadBalancer {
112
113  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
114
115  protected static final String STEPS_PER_REGION_KEY =
116    "hbase.master.balancer.stochastic.stepsPerRegion";
117  protected static final int DEFAULT_STEPS_PER_REGION = 800;
118  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
119  protected static final int DEFAULT_MAX_STEPS = 1000000;
120  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
121  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
122  protected static final String MAX_RUNNING_TIME_KEY =
123    "hbase.master.balancer.stochastic.maxRunningTime";
124  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
125  protected static final String KEEP_REGION_LOADS =
126    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
127  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
128  private static final String TABLE_FUNCTION_SEP = "_";
129  protected static final String MIN_COST_NEED_BALANCE_KEY =
130    "hbase.master.balancer.stochastic.minCostNeedBalance";
131  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
132  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
133    "hbase.master.balancer.stochastic.additionalCostFunctions";
134  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
135
136  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
137
138  // values are defaults
139  private int maxSteps = DEFAULT_MAX_STEPS;
140  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
141  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
142  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
143  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
144  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
145  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
146
147  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
148                                              // IS2_INCONSISTENT_SYNC
149  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
150  private float sumMultiplier;
151  // to save and report costs to JMX
152  private double curOverallCost = 0d;
153  private double[] tempFunctionCosts;
154  private double[] curFunctionCosts;
155
156  // Keep locality based picker and cost function to alert them
157  // when new services are offered
158  private LocalityBasedCandidateGenerator localityCandidateGenerator;
159  private ServerLocalityCostFunction localityCost;
160  private RackLocalityCostFunction rackLocalityCost;
161  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
162  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
163
164  private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
165    new HashMap<>();
166  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
167  protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
168    Suppliers.memoizeWithExpiration(() -> {
169      List<Class<? extends CandidateGenerator>> shuffled =
170        new ArrayList<>(candidateGenerators.keySet());
171      Collections.shuffle(shuffled);
172      return shuffled;
173    }, 5, TimeUnit.SECONDS);
174
175  private final BalancerConditionals balancerConditionals = BalancerConditionals.create();
176
177  /**
178   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
179   * default MetricsBalancer
180   */
181  public StochasticLoadBalancer() {
182    super(new MetricsStochasticBalancer());
183  }
184
185  @RestrictedApi(explanation = "Should only be called in tests", link = "",
186      allowedOnPath = ".*/src/test/.*")
187  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
188    super(metricsStochasticBalancer);
189  }
190
191  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
192    Configuration conf) {
193    try {
194      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
195      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
196    } catch (NoSuchMethodException e) {
197      // will try construct with no parameter
198    }
199    return ReflectionUtils.newInstance(clazz);
200  }
201
202  private void loadCustomCostFunctions(Configuration conf) {
203    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
204
205    if (null == functionsNames) {
206      return;
207    }
208    for (String className : functionsNames) {
209      Class<? extends CostFunction> clazz;
210      try {
211        clazz = Class.forName(className).asSubclass(CostFunction.class);
212      } catch (ClassNotFoundException e) {
213        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
214        continue;
215      }
216      CostFunction func = createCostFunction(clazz, conf);
217      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
218      costFunctions.add(func);
219    }
220  }
221
222  @RestrictedApi(explanation = "Should only be called in tests", link = "",
223      allowedOnPath = ".*/src/test/.*")
224  Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
225    return this.candidateGenerators;
226  }
227
228  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
229    createCandidateGenerators(Configuration conf) {
230    balancerConditionals.setConf(conf);
231    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
232    if (balancerConditionals.isReplicaDistributionEnabled()) {
233      candidateGenerators = new HashMap<>(3);
234      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
235      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
236      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
237    } else {
238      candidateGenerators = new HashMap<>(5);
239      candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
240      candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
241      candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
242      candidateGenerators.put(RegionReplicaCandidateGenerator.class,
243        new RegionReplicaCandidateGenerator());
244      candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
245        new RegionReplicaRackCandidateGenerator());
246    }
247    return candidateGenerators;
248  }
249
250  protected List<CostFunction> createCostFunctions(Configuration conf) {
251    List<CostFunction> costFunctions = new ArrayList<>();
252    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
253    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
254    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
255    addCostFunction(costFunctions, localityCost);
256    addCostFunction(costFunctions, rackLocalityCost);
257    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
258    addCostFunction(costFunctions, new StoreFileTableSkewCostFunction(conf));
259    addCostFunction(costFunctions, regionReplicaHostCostFunction);
260    addCostFunction(costFunctions, regionReplicaRackCostFunction);
261    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
262    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
263    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
264    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
265    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
266    return costFunctions;
267  }
268
269  @Override
270  protected void loadConf(Configuration conf) {
271    super.loadConf(conf);
272    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
273    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
274    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
275    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
276
277    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
278    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
279    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
280    localityCost = new ServerLocalityCostFunction(conf);
281    rackLocalityCost = new RackLocalityCostFunction(conf);
282
283    balancerConditionals.setConf(conf);
284    this.candidateGenerators = createCandidateGenerators(conf);
285
286    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
287    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
288    this.costFunctions = createCostFunctions(conf);
289    loadCustomCostFunctions(conf);
290
291    curFunctionCosts = new double[costFunctions.size()];
292    tempFunctionCosts = new double[costFunctions.size()];
293
294    LOG.info(
295      "Loaded config: maxSteps={}, runMaxSteps={}, stepsPerRegion={}, maxRunningTime={}, "
296        + "isByTable={}, CostFunctions={}, sum of multiplier of cost functions = {} etc.",
297      maxSteps, runMaxSteps, stepsPerRegion, maxRunningTime, isByTable,
298      Arrays.toString(getCostFunctionNames()), sumMultiplier);
299  }
300
301  @Override
302  public void updateClusterMetrics(ClusterMetrics st) {
303    super.updateClusterMetrics(st);
304    updateRegionLoad();
305
306    // update metrics size
307    try {
308      // by-table or ensemble mode
309      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
310      int functionsCount = getCostFunctionNames().length;
311
312      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
313    } catch (Exception e) {
314      LOG.error("failed to get the size of all tables", e);
315    }
316  }
317
318  private void updateBalancerTableLoadInfo(TableName tableName,
319    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
320    RegionHDFSBlockLocationFinder finder = null;
321    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
322      finder = this.regionFinder;
323    }
324    BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager);
325
326    initCosts(cluster);
327    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
328    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
329    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
330  }
331
332  protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState,
333    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder,
334    RackManager rackManager) {
335    return new BalancerClusterState(clusterState, loads, finder, rackManager);
336  }
337
338  @Override
339  public void
340    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
341    if (isByTable) {
342      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
343        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
344      });
345    } else {
346      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
347        toEnsumbleTableLoad(loadOfAllTable));
348    }
349  }
350
351  /**
352   * Update the number of metrics that are reported to JMX
353   */
354  @RestrictedApi(explanation = "Should only be called in tests", link = "",
355      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
356  void updateMetricsSize(int size) {
357    if (metricsBalancer instanceof MetricsStochasticBalancer) {
358      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
359    }
360  }
361
362  private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) {
363    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
364      // This check is unnecessary without replicas, or with conditional replica distribution
365      // The balancer will auto-run if conditional replica distribution candidates are available
366      return false;
367    }
368    if (c.numHosts >= c.maxReplicas) {
369      regionReplicaHostCostFunction.prepare(c);
370      double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
371      boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost);
372      if (colocatedAtHost) {
373        return true;
374      }
375      LOG.trace("No host colocation detected with host cost={}", hostCost);
376    }
377    return false;
378  }
379
380  private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) {
381    if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) {
382      // This check is unnecessary without replicas, or with conditional replica distribution
383      // The balancer will auto-run if conditional replica distribution candidates are available
384      return false;
385    }
386    if (c.numRacks >= c.maxReplicas) {
387      regionReplicaRackCostFunction.prepare(c);
388      double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
389      boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost);
390      if (colocatedAtRack) {
391        return true;
392      }
393      LOG.trace("No rack colocation detected with rack cost={}", rackCost);
394    } else {
395      LOG.trace("Rack colocation is inevitable with fewer racks than replicas, "
396        + "so we won't bother checking");
397    }
398    return false;
399  }
400
401  private String getBalanceReason(double total, double sumMultiplier) {
402    if (total <= 0) {
403      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
404    } else if (sumMultiplier <= 0) {
405      return "sumMultiplier = " + sumMultiplier + " <= 0";
406    } else if ((total / sumMultiplier) < minCostNeedBalance) {
407      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
408        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
409    } else {
410      return "";
411    }
412  }
413
414  @RestrictedApi(explanation = "Should only be called in tests", link = "",
415      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
416  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
417    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
418    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
419      LOG.info(
420        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
421      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
422        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
423      return false;
424    }
425
426    if (areSomeRegionReplicasColocatedOnHost(cluster)) {
427      LOG.info("Running balancer because at least one server hosts replicas of the same region."
428        + " function cost={}", functionCost());
429      return true;
430    }
431
432    if (areSomeRegionReplicasColocatedOnRack(cluster)) {
433      LOG.info("Running balancer because at least one rack hosts replicas of the same region."
434        + " function cost={}", functionCost());
435      return true;
436    }
437
438    if (idleRegionServerExist(cluster)) {
439      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
440        functionCost());
441      return true;
442    }
443
444    if (
445      // table isolation is inherently incompatible with naive "sloppy server" checks
446      !balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
447    ) {
448      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
449        functionCost());
450      return true;
451    }
452
453    if (balancerConditionals.shouldRunBalancer(cluster)) {
454      LOG.info("Running balancer because conditional candidate generators have important moves");
455      return true;
456    }
457
458    double total = 0.0;
459    float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized
460    for (CostFunction c : costFunctions) {
461      if (!c.isNeeded()) {
462        LOG.trace("{} not needed", c.getClass().getSimpleName());
463        continue;
464      }
465      total += c.cost() * c.getMultiplier();
466      localSumMultiplier += c.getMultiplier();
467    }
468    sumMultiplier = localSumMultiplier;
469    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
470
471    if (balanced) {
472      final double calculatedTotal = total;
473      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
474        costFunctions);
475      LOG.info(
476        "{} - skipping load balancing because weighted average imbalance={} <= "
477          + "threshold({}) and conditionals do not have opinionated move candidates. "
478          + "If you want more aggressive balancing, either lower "
479          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
480          + "multiplier(s) of the specific cost function(s). functionCost={}",
481        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
482        minCostNeedBalance, minCostNeedBalance, functionCost());
483    } else {
484      LOG.info(
485        "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}",
486        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total,
487        minCostNeedBalance);
488    }
489    return !balanced;
490  }
491
492  @RestrictedApi(explanation = "Should only be called in tests", link = "",
493      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
494  Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
495    CandidateGenerator generator = getRandomGenerator(cluster);
496    return Pair.newPair(generator, generator.generate(cluster));
497  }
498
499  /**
500   * Select the candidate generator to use based on the cost of cost functions. The chance of
501   * selecting a candidate generator is proportional to the share of cost of all cost functions
502   * among all cost functions that benefit from it.
503   */
504  protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
505    // Prefer conditional generators if they have moves to make
506    if (balancerConditionals.isConditionalBalancingEnabled()) {
507      for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) {
508        List<RegionPlanConditionalCandidateGenerator> generators =
509          conditional.getCandidateGenerators();
510        for (RegionPlanConditionalCandidateGenerator generator : generators) {
511          if (generator.getWeight(cluster) > 0) {
512            return generator;
513          }
514        }
515      }
516    }
517
518    List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
519    List<Double> partialSums = new ArrayList<>(generatorClasses.size());
520    double sum = 0.0;
521    for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
522      double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
523      sum += weight;
524      partialSums.add(sum);
525    }
526
527    // If the sum of all weights is zero, fall back to any generator
528    if (sum == 0.0) {
529      return pickAnyGenerator(generatorClasses);
530    }
531
532    double rand = ThreadLocalRandom.current().nextDouble();
533    // Normalize partial sums so that the last one should be exactly 1.0
534    for (int i = 0; i < partialSums.size(); i++) {
535      partialSums.set(i, partialSums.get(i) / sum);
536    }
537
538    // Generate a random number and pick the first generator whose partial sum is >= rand
539    for (int i = 0; i < partialSums.size(); i++) {
540      if (rand <= partialSums.get(i)) {
541        return candidateGenerators.get(generatorClasses.get(i));
542      }
543    }
544
545    // Fallback: if for some reason we didn't return above, return any generator
546    return pickAnyGenerator(generatorClasses);
547  }
548
549  private CandidateGenerator
550    pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
551    Class<? extends CandidateGenerator> randomClass =
552      generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
553    return candidateGenerators.get(randomClass);
554  }
555
556  @RestrictedApi(explanation = "Should only be called in tests", link = "",
557      allowedOnPath = ".*/src/test/.*")
558  void setRackManager(RackManager rackManager) {
559    this.rackManager = rackManager;
560  }
561
562  private long calculateMaxSteps(BalancerClusterState cluster) {
563    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
564  }
565
566  /**
567   * Given the cluster state this will try and approach an optimal balance. This should always
568   * approach the optimal state given enough steps.
569   */
570  @Override
571  protected List<RegionPlan> balanceTable(TableName tableName,
572    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
573    // On clusters with lots of HFileLinks or lots of reference files,
574    // instantiating the storefile infos can be quite expensive.
575    // Allow turning this feature off if the locality cost is not going to
576    // be used in any computations.
577    RegionHDFSBlockLocationFinder finder = null;
578    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
579      finder = this.regionFinder;
580    }
581
582    // The clusterState that is given to this method contains the state
583    // of all the regions in the table(s) (that's true today)
584    // Keep track of servers to iterate through them.
585    BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager);
586
587    long startTime = EnvironmentEdgeManager.currentTime();
588    cluster.setStopRequestedAt(startTime + maxRunningTime);
589
590    initCosts(cluster);
591    balancerConditionals.loadClusterState(cluster);
592    balancerConditionals.clearConditionalWeightCaches();
593
594    float localSumMultiplier = 0;
595    for (CostFunction c : costFunctions) {
596      if (c.isNeeded()) {
597        localSumMultiplier += c.getMultiplier();
598      }
599    }
600    sumMultiplier = localSumMultiplier;
601    if (sumMultiplier <= 0) {
602      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
603        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
604      return null;
605    }
606
607    double currentCost = computeCost(cluster, Double.MAX_VALUE);
608    curOverallCost = currentCost;
609    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
610    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
611    double initCost = currentCost;
612    double newCost;
613
614    if (!needsBalance(tableName, cluster)) {
615      return null;
616    }
617
618    long computedMaxSteps;
619    if (runMaxSteps) {
620      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
621    } else {
622      long calculatedMaxSteps = calculateMaxSteps(cluster);
623      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
624      if (calculatedMaxSteps > maxSteps) {
625        LOG.warn(
626          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
627            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
628            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
629            + "(This config change does not require service restart)",
630          calculatedMaxSteps, maxSteps);
631      }
632    }
633    LOG.info(
634      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
635        + "functionCost={} computedMaxSteps={}",
636      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
637
638    final String initFunctionTotalCosts = totalCostsPerFunc();
639    // Perform a stochastic walk to see if we can get a good fit.
640    long step;
641    boolean planImprovedConditionals = false;
642    Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
643    Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
644    for (step = 0; step < computedMaxSteps; step++) {
645      Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
646      CandidateGenerator generator = nextAction.getFirst();
647      BalanceAction action = nextAction.getSecond();
648
649      if (action.getType() == BalanceAction.Type.NULL) {
650        continue;
651      }
652
653      int conditionalViolationsChange = 0;
654      boolean isViolatingConditionals = false;
655      boolean moveImprovedConditionals = false;
656      // Only check conditionals if they are enabled
657      if (balancerConditionals.isConditionalBalancingEnabled()) {
658        // Always accept a conditional generator output. Sometimes conditional generators
659        // may need to make controversial moves in order to break what would otherwise
660        // be a deadlocked situation.
661        // Otherwise, for normal moves, evaluate the action.
662        if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) {
663          conditionalViolationsChange = -1;
664        } else {
665          conditionalViolationsChange =
666            balancerConditionals.getViolationCountChange(cluster, action);
667          isViolatingConditionals = balancerConditionals.isViolating(cluster, action);
668        }
669        moveImprovedConditionals = conditionalViolationsChange < 0;
670        if (moveImprovedConditionals) {
671          planImprovedConditionals = true;
672        }
673      }
674
675      // Change state and evaluate costs
676      try {
677        cluster.doAction(action);
678      } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
679        LOG.warn(
680          "Generator {} produced invalid action! "
681            + "Debug your candidate generator as this is likely a bug, "
682            + "and may cause a balancer deadlock. {}",
683          generator.getClass().getSimpleName(), action, e);
684        continue;
685      }
686      updateCostsAndWeightsWithAction(cluster, action);
687      generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum);
688
689      newCost = computeCost(cluster, currentCost);
690
691      if (LOG.isDebugEnabled() && action.getType() == BalanceAction.Type.MOVE_REGION) {
692        LOG.debug(
693          "action moving region {} from {} to {} with cost {}. currentCost={}, functionCost={}",
694          cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(),
695          cluster.servers[((MoveRegionAction) action).getFromServer()].getServerName(),
696          cluster.servers[((MoveRegionAction) action).getToServer()].getServerName(), newCost,
697          currentCost, functionCost());
698      }
699
700      double costImprovement = currentCost - newCost;
701      double minimumImprovement =
702        Math.max(CostFunction.getCostEpsilon(currentCost), CostFunction.getCostEpsilon(newCost));
703      boolean costsImproved = costImprovement > minimumImprovement;
704      boolean conditionalsSimilarCostsImproved =
705        (costsImproved && conditionalViolationsChange == 0 && !isViolatingConditionals);
706      // Our first priority is to reduce conditional violations
707      // Our second priority is to reduce balancer cost
708      // change, regardless of cost change
709      if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
710        currentCost = newCost;
711        generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(),
712          Long::sum);
713
714        // save for JMX
715        curOverallCost = currentCost;
716        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
717      } else {
718        // Put things back the way they were before.
719        // TODO: undo by remembering old values
720        BalanceAction undoAction = action.undoAction();
721        cluster.doAction(undoAction);
722        updateCostsAndWeightsWithAction(cluster, undoAction);
723      }
724
725      if (cluster.isStopRequested()) {
726        break;
727      }
728    }
729    long endTime = EnvironmentEdgeManager.currentTime();
730
731    StringJoiner joiner = new StringJoiner("\n");
732    joiner.add("CandidateGenerator activity summary:");
733    generatorToStepCount.forEach((generator, count) -> {
734      long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
735      joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
736        approvals));
737    });
738    LOG.debug(joiner.toString());
739
740    metricsBalancer.balanceCluster(endTime - startTime);
741
742    if (planImprovedConditionals || (initCost > currentCost)) {
743      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
744      List<RegionPlan> plans = createRegionPlans(cluster);
745      LOG.info(
746        "Finished computing new moving plan. Computation took {} ms"
747          + " to try {} different iterations.  Found a solution that moves "
748          + "{} regions; Going from a computed imbalance of {}"
749          + " to a new imbalance of {}. funtionCost={}",
750        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
751        currentCost / sumMultiplier, functionCost());
752      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
753      return plans;
754    }
755    LOG.info(
756      "Could not find a better moving plan.  Tried {} different configurations in "
757        + "{} ms, and did not find anything with an imbalance score less than {} "
758        + "and could not improve conditional violations",
759      step, endTime - startTime, initCost / sumMultiplier);
760    return null;
761  }
762
763  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
764    List<CostFunction> costFunctions) {
765    provider.recordBalancerRejection(() -> {
766      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
767      if (costFunctions != null) {
768        for (CostFunction c : costFunctions) {
769          if (!c.isNeeded()) {
770            continue;
771          }
772          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
773        }
774      }
775      return builder.build();
776    });
777  }
778
779  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
780    double initCost, String initFunctionTotalCosts, long step) {
781    provider.recordBalancerDecision(() -> {
782      List<String> regionPlans = new ArrayList<>();
783      for (RegionPlan plan : plans) {
784        regionPlans
785          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
786            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
787      }
788      return new BalancerDecision.Builder().setInitTotalCost(initCost)
789        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
790        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
791        .setRegionPlans(regionPlans).build();
792    });
793  }
794
795  /**
796   * update costs to JMX
797   */
798  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
799    if (tableName == null) {
800      return;
801    }
802
803    // check if the metricsBalancer is MetricsStochasticBalancer before casting
804    if (metricsBalancer instanceof MetricsStochasticBalancer) {
805      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
806      // overall cost
807      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
808        "Overall cost", overall);
809
810      // each cost function
811      for (int i = 0; i < costFunctions.size(); i++) {
812        CostFunction costFunction = costFunctions.get(i);
813        String costFunctionName = costFunction.getClass().getSimpleName();
814        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
815        // TODO: cost function may need a specific description
816        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
817          "The percent of " + costFunctionName, costPercent);
818      }
819    }
820  }
821
822  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
823    float multiplier = costFunction.getMultiplier();
824    if (multiplier > 0) {
825      costFunctions.add(costFunction);
826    }
827  }
828
829  protected String functionCost() {
830    StringBuilder builder = new StringBuilder();
831    for (CostFunction c : costFunctions) {
832      builder.append(c.getClass().getSimpleName());
833      builder.append(" : (");
834      if (c.isNeeded()) {
835        builder.append("multiplier=" + c.getMultiplier());
836        builder.append(", ");
837        double cost = c.cost();
838        builder.append("imbalance=" + cost);
839        if (cost >= minCostNeedBalance) {
840          builder.append(", need balance");
841        }
842      } else {
843        builder.append("not needed");
844      }
845      builder.append("); ");
846    }
847    return builder.toString();
848  }
849
850  @RestrictedApi(explanation = "Should only be called in tests", link = "",
851      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
852  List<CostFunction> getCostFunctions() {
853    return costFunctions;
854  }
855
856  private String totalCostsPerFunc() {
857    StringBuilder builder = new StringBuilder();
858    for (CostFunction c : costFunctions) {
859      if (!c.isNeeded()) {
860        continue;
861      }
862      double cost = c.getMultiplier() * c.cost();
863      if (cost > 0.0) {
864        builder.append(" ");
865        builder.append(c.getClass().getSimpleName());
866        builder.append(" : ");
867        builder.append(cost);
868        builder.append(";");
869      }
870    }
871    if (builder.length() > 0) {
872      builder.deleteCharAt(builder.length() - 1);
873    }
874    return builder.toString();
875  }
876
877  /**
878   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
879   * state.
880   * @param cluster The state of the cluster
881   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
882   */
883  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
884    List<RegionPlan> plans = new ArrayList<>();
885    for (int regionIndex = 0; regionIndex
886        < cluster.regionIndexToServerIndex.length; regionIndex++) {
887      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
888      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
889
890      if (initialServerIndex != newServerIndex) {
891        RegionInfo region = cluster.regions[regionIndex];
892        ServerName initialServer = cluster.servers[initialServerIndex];
893        ServerName newServer = cluster.servers[newServerIndex];
894
895        if (LOG.isTraceEnabled()) {
896          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
897            + initialServer.getHostname() + " to " + newServer.getHostname());
898        }
899        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
900        plans.add(rp);
901      }
902    }
903    return plans;
904  }
905
906  /**
907   * Store the current region loads.
908   */
909  private void updateRegionLoad() {
910    // We create a new hashmap so that regions that are no longer there are removed.
911    // However we temporarily need the old loads so we can use them to keep the rolling average.
912    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
913    loads = new HashMap<>();
914
915    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
916      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
917        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
918        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
919        if (rLoads == null) {
920          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
921        } else if (rLoads.size() >= numRegionLoadsToRemember) {
922          rLoads.remove();
923        }
924        rLoads.add(new BalancerRegionLoad(rm));
925        loads.put(regionNameAsString, rLoads);
926      });
927    });
928  }
929
930  @RestrictedApi(explanation = "Should only be called in tests", link = "",
931      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
932  void initCosts(BalancerClusterState cluster) {
933    weightsOfGenerators.clear();
934    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
935      weightsOfGenerators.put(clazz, 0.0);
936    }
937    for (CostFunction c : costFunctions) {
938      c.prepare(cluster);
939      c.updateWeight(weightsOfGenerators);
940    }
941  }
942
943  /**
944   * Update both the costs of costfunctions and the weights of candidate generators
945   */
946  @RestrictedApi(explanation = "Should only be called in tests", link = "",
947      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
948  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
949    // Reset all the weights to 0
950    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
951      weightsOfGenerators.put(clazz, 0.0);
952    }
953    for (CostFunction c : costFunctions) {
954      if (c.isNeeded()) {
955        c.postAction(action);
956        c.updateWeight(weightsOfGenerators);
957      }
958    }
959  }
960
961  /**
962   * Get the names of the cost functions
963   */
964  @RestrictedApi(explanation = "Should only be called in tests", link = "",
965      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
966  String[] getCostFunctionNames() {
967    String[] ret = new String[costFunctions.size()];
968    for (int i = 0; i < costFunctions.size(); i++) {
969      CostFunction c = costFunctions.get(i);
970      ret[i] = c.getClass().getSimpleName();
971    }
972
973    return ret;
974  }
975
976  /**
977   * This is the main cost function. It will compute a cost associated with a proposed cluster
978   * state. All different costs will be combined with their multipliers to produce a double cost.
979   * @param cluster      The state of the cluster
980   * @param previousCost the previous cost. This is used as an early out.
981   * @return a double of a cost associated with the proposed cluster state. This cost is an
982   *         aggregate of all individual cost functions.
983   */
984  @RestrictedApi(explanation = "Should only be called in tests", link = "",
985      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
986  double computeCost(BalancerClusterState cluster, double previousCost) {
987    double total = 0;
988
989    for (int i = 0; i < costFunctions.size(); i++) {
990      CostFunction c = costFunctions.get(i);
991      this.tempFunctionCosts[i] = 0.0;
992
993      if (!c.isNeeded()) {
994        continue;
995      }
996
997      Float multiplier = c.getMultiplier();
998      double cost = c.cost();
999
1000      this.tempFunctionCosts[i] = multiplier * cost;
1001      total += this.tempFunctionCosts[i];
1002
1003      if (total > previousCost) {
1004        break;
1005      }
1006    }
1007
1008    return total;
1009  }
1010
1011  /**
1012   * A helper function to compose the attribute name from tablename and costfunction name
1013   */
1014  static String composeAttributeName(String tableName, String costFunctionName) {
1015    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1016  }
1017}