001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.function.Supplier;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.BalancerDecision;
040import org.apache.hadoop.hbase.client.BalancerRejection;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.master.RackManager;
043import org.apache.hadoop.hbase.master.RegionPlan;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.util.ReflectionUtils;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * <p>
053 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
054 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
055 * It includes costs functions to compute the cost of:
056 * </p>
057 * <ul>
058 * <li>Region Load</li>
059 * <li>Table Load</li>
060 * <li>Data Locality</li>
061 * <li>Memstore Sizes</li>
062 * <li>Storefile Sizes</li>
063 * </ul>
064 * <p>
065 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
066 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
067 * scaled by their respective multipliers:
068 * </p>
069 * <ul>
070 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
071 * <li>hbase.master.balancer.stochastic.moveCost</li>
072 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
073 * <li>hbase.master.balancer.stochastic.localityCost</li>
074 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
075 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
076 * </ul>
077 * <p>
078 * You can also add custom Cost function by setting the the following configuration value:
079 * </p>
080 * <ul>
081 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
082 * </ul>
083 * <p>
084 * All custom Cost Functions needs to extends {@link CostFunction}
085 * </p>
086 * <p>
087 * In addition to the above configurations, the balancer can be tuned by the following configuration
088 * values:
089 * </p>
090 * <ul>
091 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
092 * that can be moved in a single invocation of this balancer.</li>
093 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
094 * regions is multiplied to try and get the number of times the balancer will mutate all
095 * servers.</li>
096 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
097 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
098 * the above computation.</li>
099 * </ul>
100 * <p>
101 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
102 * balancer gets the full picture of all loads on the cluster.
103 * </p>
104 */
105@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
106public class StochasticLoadBalancer extends BaseLoadBalancer {
107
108  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
109
110  protected static final String STEPS_PER_REGION_KEY =
111    "hbase.master.balancer.stochastic.stepsPerRegion";
112  protected static final int DEFAULT_STEPS_PER_REGION = 800;
113  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
114  protected static final int DEFAULT_MAX_STEPS = 1000000;
115  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
116  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
117  protected static final String MAX_RUNNING_TIME_KEY =
118    "hbase.master.balancer.stochastic.maxRunningTime";
119  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
120  protected static final String KEEP_REGION_LOADS =
121    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
122  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
123  private static final String TABLE_FUNCTION_SEP = "_";
124  protected static final String MIN_COST_NEED_BALANCE_KEY =
125    "hbase.master.balancer.stochastic.minCostNeedBalance";
126  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
127  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
128    "hbase.master.balancer.stochastic.additionalCostFunctions";
129  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
130
131  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
132
133  // values are defaults
134  private int maxSteps = DEFAULT_MAX_STEPS;
135  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
136  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
137  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
138  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
139  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
140  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
141
142  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
143                                              // IS2_INCONSISTENT_SYNC
144  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
145  private float sumMultiplier;
146  // to save and report costs to JMX
147  private double curOverallCost = 0d;
148  private double[] tempFunctionCosts;
149  private double[] curFunctionCosts;
150  private double[] weightsOfGenerators;
151
152  // Keep locality based picker and cost function to alert them
153  // when new services are offered
154  private LocalityBasedCandidateGenerator localityCandidateGenerator;
155  private ServerLocalityCostFunction localityCost;
156  private RackLocalityCostFunction rackLocalityCost;
157  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
158  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
159
160  protected List<CandidateGenerator> candidateGenerators;
161
162  public enum GeneratorType {
163    RANDOM,
164    LOAD,
165    LOCALITY,
166    RACK
167  }
168
169  /**
170   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
171   * default MetricsBalancer
172   */
173  public StochasticLoadBalancer() {
174    super(new MetricsStochasticBalancer());
175  }
176
177  @RestrictedApi(explanation = "Should only be called in tests", link = "",
178      allowedOnPath = ".*/src/test/.*")
179  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
180    super(metricsStochasticBalancer);
181  }
182
183  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
184    Configuration conf) {
185    try {
186      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
187      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
188    } catch (NoSuchMethodException e) {
189      // will try construct with no parameter
190    }
191    return ReflectionUtils.newInstance(clazz);
192  }
193
194  private void loadCustomCostFunctions(Configuration conf) {
195    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
196
197    if (null == functionsNames) {
198      return;
199    }
200    for (String className : functionsNames) {
201      Class<? extends CostFunction> clazz;
202      try {
203        clazz = Class.forName(className).asSubclass(CostFunction.class);
204      } catch (ClassNotFoundException e) {
205        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
206        continue;
207      }
208      CostFunction func = createCostFunction(clazz, conf);
209      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
210      costFunctions.add(func);
211    }
212  }
213
214  @RestrictedApi(explanation = "Should only be called in tests", link = "",
215      allowedOnPath = ".*/src/test/.*")
216  List<CandidateGenerator> getCandidateGenerators() {
217    return this.candidateGenerators;
218  }
219
220  protected List<CandidateGenerator> createCandidateGenerators() {
221    List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
222    candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
223    candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
224    candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
225    candidateGenerators.add(GeneratorType.RACK.ordinal(),
226      new RegionReplicaRackCandidateGenerator());
227    return candidateGenerators;
228  }
229
230  protected List<CostFunction> createCostFunctions(Configuration conf) {
231    List<CostFunction> costFunctions = new ArrayList<>();
232    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
233    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
234    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
235    addCostFunction(costFunctions, localityCost);
236    addCostFunction(costFunctions, rackLocalityCost);
237    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
238    addCostFunction(costFunctions, regionReplicaHostCostFunction);
239    addCostFunction(costFunctions, regionReplicaRackCostFunction);
240    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
241    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
242    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
243    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
244    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
245    return costFunctions;
246  }
247
248  @Override
249  protected void loadConf(Configuration conf) {
250    super.loadConf(conf);
251    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
252    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
253    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
254    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
255
256    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
257    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
258    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
259    localityCost = new ServerLocalityCostFunction(conf);
260    rackLocalityCost = new RackLocalityCostFunction(conf);
261
262    this.candidateGenerators = createCandidateGenerators();
263
264    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
265    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
266    this.costFunctions = createCostFunctions(conf);
267    loadCustomCostFunctions(conf);
268
269    curFunctionCosts = new double[costFunctions.size()];
270    tempFunctionCosts = new double[costFunctions.size()];
271
272    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
273      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
274      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
275      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
276  }
277
278  @Override
279  public void updateClusterMetrics(ClusterMetrics st) {
280    super.updateClusterMetrics(st);
281    updateRegionLoad();
282
283    // update metrics size
284    try {
285      // by-table or ensemble mode
286      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
287      int functionsCount = getCostFunctionNames().length;
288
289      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
290    } catch (Exception e) {
291      LOG.error("failed to get the size of all tables", e);
292    }
293  }
294
295  private void updateBalancerTableLoadInfo(TableName tableName,
296    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
297    RegionHDFSBlockLocationFinder finder = null;
298    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
299      finder = this.regionFinder;
300    }
301    BalancerClusterState cluster =
302      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
303
304    initCosts(cluster);
305    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
306    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
307    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
308  }
309
310  @Override
311  public void
312    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
313    if (isByTable) {
314      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
315        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
316      });
317    } else {
318      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
319        toEnsumbleTableLoad(loadOfAllTable));
320    }
321  }
322
323  /**
324   * Update the number of metrics that are reported to JMX
325   */
326  @RestrictedApi(explanation = "Should only be called in tests", link = "",
327      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
328  void updateMetricsSize(int size) {
329    if (metricsBalancer instanceof MetricsStochasticBalancer) {
330      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
331    }
332  }
333
334  private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
335    regionReplicaHostCostFunction.prepare(c);
336    return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON);
337  }
338
339  private String getBalanceReason(double total, double sumMultiplier) {
340    if (total <= 0) {
341      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
342    } else if (sumMultiplier <= 0) {
343      return "sumMultiplier = " + sumMultiplier + " <= 0";
344    } else if ((total / sumMultiplier) < minCostNeedBalance) {
345      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
346        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
347    } else {
348      return "";
349    }
350  }
351
352  @RestrictedApi(explanation = "Should only be called in tests", link = "",
353      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
354  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
355    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
356    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
357      LOG.info(
358        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
359      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
360        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
361      return false;
362    }
363    if (areSomeRegionReplicasColocated(cluster)) {
364      LOG.info("Running balancer because at least one server hosts replicas of the same region."
365        + " function cost={}", functionCost());
366      return true;
367    }
368
369    if (idleRegionServerExist(cluster)) {
370      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
371        functionCost());
372      return true;
373    }
374
375    if (sloppyRegionServerExist(cs)) {
376      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
377        functionCost());
378      return true;
379    }
380
381    double total = 0.0;
382    for (CostFunction c : costFunctions) {
383      if (!c.isNeeded()) {
384        LOG.trace("{} not needed", c.getClass().getSimpleName());
385        continue;
386      }
387      total += c.cost() * c.getMultiplier();
388    }
389    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
390
391    if (balanced) {
392      final double calculatedTotal = total;
393      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
394        costFunctions);
395      LOG.info(
396        "{} - skipping load balancing because weighted average imbalance={} <= "
397          + "threshold({}). If you want more aggressive balancing, either lower "
398          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
399          + "multiplier(s) of the specific cost function(s). functionCost={}",
400        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
401        minCostNeedBalance, minCostNeedBalance, functionCost());
402    } else {
403      LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
404        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime);
405    }
406    return !balanced;
407  }
408
409  @RestrictedApi(explanation = "Should only be called in tests", link = "",
410      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
411  BalanceAction nextAction(BalancerClusterState cluster) {
412    return getRandomGenerator().generate(cluster);
413  }
414
415  /**
416   * Select the candidate generator to use based on the cost of cost functions. The chance of
417   * selecting a candidate generator is propotional to the share of cost of all cost functions among
418   * all cost functions that benefit from it.
419   */
420  protected CandidateGenerator getRandomGenerator() {
421    double sum = 0;
422    for (int i = 0; i < weightsOfGenerators.length; i++) {
423      sum += weightsOfGenerators[i];
424      weightsOfGenerators[i] = sum;
425    }
426    if (sum == 0) {
427      return candidateGenerators.get(0);
428    }
429    for (int i = 0; i < weightsOfGenerators.length; i++) {
430      weightsOfGenerators[i] /= sum;
431    }
432    double rand = ThreadLocalRandom.current().nextDouble();
433    for (int i = 0; i < weightsOfGenerators.length; i++) {
434      if (rand <= weightsOfGenerators[i]) {
435        return candidateGenerators.get(i);
436      }
437    }
438    return candidateGenerators.get(candidateGenerators.size() - 1);
439  }
440
441  @RestrictedApi(explanation = "Should only be called in tests", link = "",
442      allowedOnPath = ".*/src/test/.*")
443  void setRackManager(RackManager rackManager) {
444    this.rackManager = rackManager;
445  }
446
447  private long calculateMaxSteps(BalancerClusterState cluster) {
448    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
449  }
450
451  /**
452   * Given the cluster state this will try and approach an optimal balance. This should always
453   * approach the optimal state given enough steps.
454   */
455  @Override
456  protected List<RegionPlan> balanceTable(TableName tableName,
457    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
458    // On clusters with lots of HFileLinks or lots of reference files,
459    // instantiating the storefile infos can be quite expensive.
460    // Allow turning this feature off if the locality cost is not going to
461    // be used in any computations.
462    RegionHDFSBlockLocationFinder finder = null;
463    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
464      finder = this.regionFinder;
465    }
466
467    // The clusterState that is given to this method contains the state
468    // of all the regions in the table(s) (that's true today)
469    // Keep track of servers to iterate through them.
470    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
471      rackManager, regionCacheRatioOnOldServerMap);
472
473    long startTime = EnvironmentEdgeManager.currentTime();
474
475    initCosts(cluster);
476
477    sumMultiplier = 0;
478    for (CostFunction c : costFunctions) {
479      if (c.isNeeded()) {
480        sumMultiplier += c.getMultiplier();
481      }
482    }
483    if (sumMultiplier <= 0) {
484      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
485        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
486      return null;
487    }
488
489    double currentCost = computeCost(cluster, Double.MAX_VALUE);
490    curOverallCost = currentCost;
491    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
492    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
493    double initCost = currentCost;
494    double newCost;
495
496    if (!needsBalance(tableName, cluster)) {
497      return null;
498    }
499
500    long computedMaxSteps;
501    if (runMaxSteps) {
502      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
503    } else {
504      long calculatedMaxSteps = calculateMaxSteps(cluster);
505      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
506      if (calculatedMaxSteps > maxSteps) {
507        LOG.warn(
508          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
509            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
510            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
511            + "(This config change does not require service restart)",
512          calculatedMaxSteps, maxSteps);
513      }
514    }
515    LOG.info(
516      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
517        + "functionCost={} computedMaxSteps={}",
518      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
519
520    final String initFunctionTotalCosts = totalCostsPerFunc();
521    // Perform a stochastic walk to see if we can get a good fit.
522    long step;
523
524    for (step = 0; step < computedMaxSteps; step++) {
525      BalanceAction action = nextAction(cluster);
526
527      if (action.getType() == BalanceAction.Type.NULL) {
528        continue;
529      }
530
531      cluster.doAction(action);
532      updateCostsAndWeightsWithAction(cluster, action);
533
534      newCost = computeCost(cluster, currentCost);
535
536      // Should this be kept?
537      if (newCost < currentCost) {
538        currentCost = newCost;
539
540        // save for JMX
541        curOverallCost = currentCost;
542        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
543      } else {
544        // Put things back the way they were before.
545        // TODO: undo by remembering old values
546        BalanceAction undoAction = action.undoAction();
547        cluster.doAction(undoAction);
548        updateCostsAndWeightsWithAction(cluster, undoAction);
549      }
550
551      if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
552        break;
553      }
554    }
555    long endTime = EnvironmentEdgeManager.currentTime();
556
557    metricsBalancer.balanceCluster(endTime - startTime);
558
559    if (initCost > currentCost) {
560      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
561      List<RegionPlan> plans = createRegionPlans(cluster);
562      LOG.info(
563        "Finished computing new moving plan. Computation took {} ms"
564          + " to try {} different iterations.  Found a solution that moves "
565          + "{} regions; Going from a computed imbalance of {}"
566          + " to a new imbalance of {}. funtionCost={}",
567        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
568        currentCost / sumMultiplier, functionCost());
569      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
570      return plans;
571    }
572    LOG.info(
573      "Could not find a better moving plan.  Tried {} different configurations in "
574        + "{} ms, and did not find anything with an imbalance score less than {}",
575      step, endTime - startTime, initCost / sumMultiplier);
576    return null;
577  }
578
579  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
580    List<CostFunction> costFunctions) {
581    provider.recordBalancerRejection(() -> {
582      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
583      if (costFunctions != null) {
584        for (CostFunction c : costFunctions) {
585          if (!c.isNeeded()) {
586            continue;
587          }
588          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
589        }
590      }
591      return builder.build();
592    });
593  }
594
595  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
596    double initCost, String initFunctionTotalCosts, long step) {
597    provider.recordBalancerDecision(() -> {
598      List<String> regionPlans = new ArrayList<>();
599      for (RegionPlan plan : plans) {
600        regionPlans
601          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
602            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
603      }
604      return new BalancerDecision.Builder().setInitTotalCost(initCost)
605        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
606        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
607        .setRegionPlans(regionPlans).build();
608    });
609  }
610
611  /**
612   * update costs to JMX
613   */
614  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
615    if (tableName == null) {
616      return;
617    }
618
619    // check if the metricsBalancer is MetricsStochasticBalancer before casting
620    if (metricsBalancer instanceof MetricsStochasticBalancer) {
621      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
622      // overall cost
623      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
624        "Overall cost", overall);
625
626      // each cost function
627      for (int i = 0; i < costFunctions.size(); i++) {
628        CostFunction costFunction = costFunctions.get(i);
629        String costFunctionName = costFunction.getClass().getSimpleName();
630        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
631        // TODO: cost function may need a specific description
632        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
633          "The percent of " + costFunctionName, costPercent);
634      }
635    }
636  }
637
638  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
639    float multiplier = costFunction.getMultiplier();
640    if (multiplier > 0) {
641      costFunctions.add(costFunction);
642    }
643  }
644
645  protected String functionCost() {
646    StringBuilder builder = new StringBuilder();
647    for (CostFunction c : costFunctions) {
648      builder.append(c.getClass().getSimpleName());
649      builder.append(" : (");
650      if (c.isNeeded()) {
651        builder.append("multiplier=" + c.getMultiplier());
652        builder.append(", ");
653        double cost = c.cost();
654        builder.append("imbalance=" + cost);
655        if (cost >= minCostNeedBalance) {
656          builder.append(", need balance");
657        }
658      } else {
659        builder.append("not needed");
660      }
661      builder.append("); ");
662    }
663    return builder.toString();
664  }
665
666  @RestrictedApi(explanation = "Should only be called in tests", link = "",
667      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
668  List<CostFunction> getCostFunctions() {
669    return costFunctions;
670  }
671
672  private String totalCostsPerFunc() {
673    StringBuilder builder = new StringBuilder();
674    for (CostFunction c : costFunctions) {
675      if (!c.isNeeded()) {
676        continue;
677      }
678      double cost = c.getMultiplier() * c.cost();
679      if (cost > 0.0) {
680        builder.append(" ");
681        builder.append(c.getClass().getSimpleName());
682        builder.append(" : ");
683        builder.append(cost);
684        builder.append(";");
685      }
686    }
687    if (builder.length() > 0) {
688      builder.deleteCharAt(builder.length() - 1);
689    }
690    return builder.toString();
691  }
692
693  /**
694   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
695   * state.
696   * @param cluster The state of the cluster
697   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
698   */
699  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
700    List<RegionPlan> plans = new ArrayList<>();
701    for (int regionIndex = 0; regionIndex
702        < cluster.regionIndexToServerIndex.length; regionIndex++) {
703      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
704      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
705
706      if (initialServerIndex != newServerIndex) {
707        RegionInfo region = cluster.regions[regionIndex];
708        ServerName initialServer = cluster.servers[initialServerIndex];
709        ServerName newServer = cluster.servers[newServerIndex];
710
711        if (LOG.isTraceEnabled()) {
712          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
713            + initialServer.getHostname() + " to " + newServer.getHostname());
714        }
715        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
716        plans.add(rp);
717      }
718    }
719    return plans;
720  }
721
722  /**
723   * Store the current region loads.
724   */
725  private void updateRegionLoad() {
726    // We create a new hashmap so that regions that are no longer there are removed.
727    // However we temporarily need the old loads so we can use them to keep the rolling average.
728    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
729    loads = new HashMap<>();
730
731    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
732      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
733        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
734        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
735        if (rLoads == null) {
736          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
737        } else if (rLoads.size() >= numRegionLoadsToRemember) {
738          rLoads.remove();
739        }
740        rLoads.add(new BalancerRegionLoad(rm));
741        loads.put(regionNameAsString, rLoads);
742      });
743    });
744  }
745
746  @RestrictedApi(explanation = "Should only be called in tests", link = "",
747      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
748  void initCosts(BalancerClusterState cluster) {
749    // Initialize the weights of generator every time
750    weightsOfGenerators = new double[this.candidateGenerators.size()];
751    for (CostFunction c : costFunctions) {
752      c.prepare(cluster);
753      c.updateWeight(weightsOfGenerators);
754    }
755  }
756
757  /**
758   * Update both the costs of costfunctions and the weights of candidate generators
759   */
760  @RestrictedApi(explanation = "Should only be called in tests", link = "",
761      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
762  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
763    // Reset all the weights to 0
764    for (int i = 0; i < weightsOfGenerators.length; i++) {
765      weightsOfGenerators[i] = 0;
766    }
767    for (CostFunction c : costFunctions) {
768      if (c.isNeeded()) {
769        c.postAction(action);
770        c.updateWeight(weightsOfGenerators);
771      }
772    }
773  }
774
775  /**
776   * Get the names of the cost functions
777   */
778  @RestrictedApi(explanation = "Should only be called in tests", link = "",
779      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
780  String[] getCostFunctionNames() {
781    String[] ret = new String[costFunctions.size()];
782    for (int i = 0; i < costFunctions.size(); i++) {
783      CostFunction c = costFunctions.get(i);
784      ret[i] = c.getClass().getSimpleName();
785    }
786
787    return ret;
788  }
789
790  /**
791   * This is the main cost function. It will compute a cost associated with a proposed cluster
792   * state. All different costs will be combined with their multipliers to produce a double cost.
793   * @param cluster      The state of the cluster
794   * @param previousCost the previous cost. This is used as an early out.
795   * @return a double of a cost associated with the proposed cluster state. This cost is an
796   *         aggregate of all individual cost functions.
797   */
798  @RestrictedApi(explanation = "Should only be called in tests", link = "",
799      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
800  double computeCost(BalancerClusterState cluster, double previousCost) {
801    double total = 0;
802
803    for (int i = 0; i < costFunctions.size(); i++) {
804      CostFunction c = costFunctions.get(i);
805      this.tempFunctionCosts[i] = 0.0;
806
807      if (!c.isNeeded()) {
808        continue;
809      }
810
811      Float multiplier = c.getMultiplier();
812      double cost = c.cost();
813
814      this.tempFunctionCosts[i] = multiplier * cost;
815      total += this.tempFunctionCosts[i];
816
817      if (total > previousCost) {
818        break;
819      }
820    }
821
822    return total;
823  }
824
825  /**
826   * A helper function to compose the attribute name from tablename and costfunction name
827   */
828  static String composeAttributeName(String tableName, String costFunctionName) {
829    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
830  }
831}