001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ThreadLocalRandom;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.RegionMetrics;
035import org.apache.hadoop.hbase.ServerMetrics;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.BalancerDecision;
039import org.apache.hadoop.hbase.client.BalancerRejection;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.RackManager;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
044import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * <p>
054 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
055 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
056 * It includes costs functions to compute the cost of:
057 * </p>
058 * <ul>
059 * <li>Region Load</li>
060 * <li>Table Load</li>
061 * <li>Data Locality</li>
062 * <li>Memstore Sizes</li>
063 * <li>Storefile Sizes</li>
064 * </ul>
065 * <p>
066 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
067 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
068 * scaled by their respective multipliers:
069 * </p>
070 * <ul>
071 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
072 * <li>hbase.master.balancer.stochastic.moveCost</li>
073 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
074 * <li>hbase.master.balancer.stochastic.localityCost</li>
075 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
076 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
077 * </ul>
078 * <p>
079 * You can also add custom Cost function by setting the the following configuration value:
080 * </p>
081 * <ul>
082 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
083 * </ul>
084 * <p>
085 * All custom Cost Functions needs to extends {@link CostFunction}
086 * </p>
087 * <p>
088 * In addition to the above configurations, the balancer can be tuned by the following configuration
089 * values:
090 * </p>
091 * <ul>
092 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
093 * that can be moved in a single invocation of this balancer.</li>
094 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
095 * regions is multiplied to try and get the number of times the balancer will mutate all
096 * servers.</li>
097 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
098 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
099 * the above computation.</li>
100 * </ul>
101 * <p>
102 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
103 * balancer gets the full picture of all loads on the cluster.
104 * </p>
105 */
106@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
107public class StochasticLoadBalancer extends BaseLoadBalancer {
108
109  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
110
111  protected static final String STEPS_PER_REGION_KEY =
112    "hbase.master.balancer.stochastic.stepsPerRegion";
113  protected static final int DEFAULT_STEPS_PER_REGION = 800;
114  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
115  protected static final int DEFAULT_MAX_STEPS = 1000000;
116  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
117  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
118  protected static final String MAX_RUNNING_TIME_KEY =
119    "hbase.master.balancer.stochastic.maxRunningTime";
120  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
121  protected static final String KEEP_REGION_LOADS =
122    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
123  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
124  private static final String TABLE_FUNCTION_SEP = "_";
125  protected static final String MIN_COST_NEED_BALANCE_KEY =
126    "hbase.master.balancer.stochastic.minCostNeedBalance";
127  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
128  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
129    "hbase.master.balancer.stochastic.additionalCostFunctions";
130  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
131
132  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
133
134  // values are defaults
135  private int maxSteps = DEFAULT_MAX_STEPS;
136  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
137  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
138  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
139  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
140  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
141  private boolean isBalancerDecisionRecording = false;
142  private boolean isBalancerRejectionRecording = false;
143
144  protected List<CandidateGenerator> candidateGenerators;
145
146  public enum GeneratorType {
147    RANDOM,
148    LOAD,
149    LOCALITY,
150    RACK
151  }
152
153  private double[] weightsOfGenerators;
154  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
155  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
156  private float sumMultiplier;
157  // to save and report costs to JMX
158  private double curOverallCost = 0d;
159  private double[] tempFunctionCosts;
160  private double[] curFunctionCosts;
161
162  // Keep locality based picker and cost function to alert them
163  // when new services are offered
164  private LocalityBasedCandidateGenerator localityCandidateGenerator;
165  private ServerLocalityCostFunction localityCost;
166  private RackLocalityCostFunction rackLocalityCost;
167  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
168  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
169
170  /**
171   * Use to add balancer decision history to ring-buffer
172   */
173  NamedQueueRecorder namedQueueRecorder;
174
175  /**
176   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
177   * default MetricsBalancer
178   */
179  public StochasticLoadBalancer() {
180    super(new MetricsStochasticBalancer());
181  }
182
183  @RestrictedApi(explanation = "Should only be called in tests", link = "",
184      allowedOnPath = ".*/src/test/.*")
185  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
186    super(metricsStochasticBalancer);
187  }
188
189  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
190    Configuration conf) {
191    try {
192      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
193      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
194    } catch (NoSuchMethodException e) {
195      // will try construct with no parameter
196    }
197    return ReflectionUtils.newInstance(clazz);
198  }
199
200  private void loadCustomCostFunctions(Configuration conf) {
201    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
202
203    if (null == functionsNames) {
204      return;
205    }
206    for (String className : functionsNames) {
207      Class<? extends CostFunction> clazz;
208      try {
209        clazz = Class.forName(className).asSubclass(CostFunction.class);
210      } catch (ClassNotFoundException e) {
211        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
212        continue;
213      }
214      CostFunction func = createCostFunction(clazz, conf);
215      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
216      costFunctions.add(func);
217    }
218  }
219
220  @RestrictedApi(explanation = "Should only be called in tests", link = "",
221      allowedOnPath = ".*/src/test/.*")
222  List<CandidateGenerator> getCandidateGenerators() {
223    return this.candidateGenerators;
224  }
225
226  protected List<CandidateGenerator> createCandidateGenerators() {
227    List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
228    candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
229    candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
230    candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
231    candidateGenerators.add(GeneratorType.RACK.ordinal(),
232      new RegionReplicaRackCandidateGenerator());
233    return candidateGenerators;
234  }
235
236  @Override
237  protected void loadConf(Configuration conf) {
238    super.loadConf(conf);
239    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
240    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
241    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
242    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
243
244    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
245    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
246    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
247    localityCost = new ServerLocalityCostFunction(conf);
248    rackLocalityCost = new RackLocalityCostFunction(conf);
249
250    this.candidateGenerators = createCandidateGenerators();
251
252    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
253    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
254    costFunctions = new ArrayList<>();
255    addCostFunction(new RegionCountSkewCostFunction(conf));
256    addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
257    addCostFunction(new MoveCostFunction(conf));
258    addCostFunction(localityCost);
259    addCostFunction(rackLocalityCost);
260    addCostFunction(new TableSkewCostFunction(conf));
261    addCostFunction(regionReplicaHostCostFunction);
262    addCostFunction(regionReplicaRackCostFunction);
263    addCostFunction(new ReadRequestCostFunction(conf));
264    addCostFunction(new WriteRequestCostFunction(conf));
265    addCostFunction(new MemStoreSizeCostFunction(conf));
266    addCostFunction(new StoreFileCostFunction(conf));
267    loadCustomCostFunctions(conf);
268
269    curFunctionCosts = new double[costFunctions.size()];
270    tempFunctionCosts = new double[costFunctions.size()];
271
272    isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
273      BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
274    isBalancerRejectionRecording =
275      conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
276        BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
277
278    if (
279      this.namedQueueRecorder == null
280        && (isBalancerDecisionRecording || isBalancerRejectionRecording)
281    ) {
282      this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
283    }
284
285    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
286      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
287      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
288      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
289  }
290
291  @Override
292  public synchronized void updateClusterMetrics(ClusterMetrics st) {
293    super.updateClusterMetrics(st);
294    updateRegionLoad();
295
296    // update metrics size
297    try {
298      // by-table or ensemble mode
299      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
300      int functionsCount = getCostFunctionNames().length;
301
302      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
303    } catch (Exception e) {
304      LOG.error("failed to get the size of all tables", e);
305    }
306  }
307
308  private void updateBalancerTableLoadInfo(TableName tableName,
309    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
310    RegionLocationFinder finder = null;
311    if (
312      (this.localityCost != null && this.localityCost.getMultiplier() > 0)
313        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)
314    ) {
315      finder = this.regionFinder;
316    }
317    BalancerClusterState cluster =
318      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
319
320    initCosts(cluster);
321    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
322    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
323    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
324  }
325
326  @Override
327  public void
328    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
329    if (isByTable) {
330      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
331        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
332      });
333    } else {
334      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
335        toEnsumbleTableLoad(loadOfAllTable));
336    }
337  }
338
339  /**
340   * Update the number of metrics that are reported to JMX
341   */
342  @RestrictedApi(explanation = "Should only be called in tests", link = "",
343      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
344  void updateMetricsSize(int size) {
345    if (metricsBalancer instanceof MetricsStochasticBalancer) {
346      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
347    }
348  }
349
350  private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
351    regionReplicaHostCostFunction.prepare(c);
352    if (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON) {
353      return true;
354    }
355    return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON);
356  }
357
358  @RestrictedApi(explanation = "Should only be called in tests", link = "",
359      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
360  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
361    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
362    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
363      LOG.info(
364        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
365      sendRejectionReasonToRingBuffer("The number of RegionServers " + cs.getNumServers()
366        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
367      return false;
368    }
369    if (areSomeRegionReplicasColocated(cluster)) {
370      LOG.info("Running balancer because at least one server hosts replicas of the same region."
371        + " function cost={}", functionCost());
372      return true;
373    }
374
375    if (idleRegionServerExist(cluster)) {
376      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
377        functionCost());
378      return true;
379    }
380
381    if (sloppyRegionServerExist(cs)) {
382      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
383        functionCost());
384      return true;
385    }
386
387    double total = 0.0;
388    for (CostFunction c : costFunctions) {
389      if (!c.isNeeded()) {
390        LOG.trace("{} not needed", c.getClass().getSimpleName());
391        continue;
392      }
393      total += c.cost() * c.getMultiplier();
394    }
395
396    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
397    if (balanced) {
398      if (isBalancerRejectionRecording) {
399        String reason = "";
400        if (total <= 0) {
401          reason =
402            "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
403        } else if (sumMultiplier <= 0) {
404          reason = "sumMultiplier = " + sumMultiplier + " <= 0";
405        } else if ((total / sumMultiplier) < minCostNeedBalance) {
406          reason =
407            "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
408              + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
409        }
410        sendRejectionReasonToRingBuffer(reason, costFunctions);
411      }
412      LOG.info(
413        "{} - skipping load balancing because weighted average imbalance={} <= "
414          + "threshold({}). If you want more aggressive balancing, either lower "
415          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
416          + "multiplier(s) of the specific cost function(s). functionCost={}",
417        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
418        minCostNeedBalance, minCostNeedBalance, functionCost());
419    } else {
420      LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
421        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime);
422    }
423    return !balanced;
424  }
425
426  @RestrictedApi(explanation = "Should only be called in tests", link = "",
427      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
428  BalanceAction nextAction(BalancerClusterState cluster) {
429    return getRandomGenerator().generate(cluster);
430  }
431
432  /**
433   * Select the candidate generator to use based on the cost of cost functions. The chance of
434   * selecting a candidate generator is propotional to the share of cost of all cost functions among
435   * all cost functions that benefit from it.
436   */
437  protected CandidateGenerator getRandomGenerator() {
438    double sum = 0;
439    for (int i = 0; i < weightsOfGenerators.length; i++) {
440      sum += weightsOfGenerators[i];
441      weightsOfGenerators[i] = sum;
442    }
443    if (sum == 0) {
444      return candidateGenerators.get(0);
445    }
446    for (int i = 0; i < weightsOfGenerators.length; i++) {
447      weightsOfGenerators[i] /= sum;
448    }
449    double rand = ThreadLocalRandom.current().nextDouble();
450    for (int i = 0; i < weightsOfGenerators.length; i++) {
451      if (rand <= weightsOfGenerators[i]) {
452        return candidateGenerators.get(i);
453      }
454    }
455    return candidateGenerators.get(candidateGenerators.size() - 1);
456  }
457
458  @RestrictedApi(explanation = "Should only be called in tests", link = "",
459      allowedOnPath = ".*/src/test/.*")
460  void setRackManager(RackManager rackManager) {
461    this.rackManager = rackManager;
462  }
463
464  private long calculateMaxSteps(BalancerClusterState cluster) {
465    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
466  }
467
468  /**
469   * Given the cluster state this will try and approach an optimal balance. This should always
470   * approach the optimal state given enough steps.
471   */
472  @Override
473  protected List<RegionPlan> balanceTable(TableName tableName,
474    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
475    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
476    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
477      return plans;
478    }
479
480    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
481      if (loadOfOneTable.size() <= 2) {
482        return null;
483      }
484      loadOfOneTable = new HashMap<>(loadOfOneTable);
485      loadOfOneTable.remove(masterServerName);
486    }
487
488    // On clusters with lots of HFileLinks or lots of reference files,
489    // instantiating the storefile infos can be quite expensive.
490    // Allow turning this feature off if the locality cost is not going to
491    // be used in any computations.
492    RegionLocationFinder finder = null;
493    if (
494      (this.localityCost != null && this.localityCost.getMultiplier() > 0)
495        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)
496    ) {
497      finder = this.regionFinder;
498    }
499
500    // The clusterState that is given to this method contains the state
501    // of all the regions in the table(s) (that's true today)
502    // Keep track of servers to iterate through them.
503    BalancerClusterState cluster =
504      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
505
506    long startTime = EnvironmentEdgeManager.currentTime();
507
508    initCosts(cluster);
509    sumMultiplier = 0;
510    for (CostFunction c : costFunctions) {
511      if (c.isNeeded()) {
512        sumMultiplier += c.getMultiplier();
513      }
514    }
515    if (sumMultiplier <= 0) {
516      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
517        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
518      return null;
519    }
520
521    double currentCost = computeCost(cluster, Double.MAX_VALUE);
522    curOverallCost = currentCost;
523    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
524    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
525    double initCost = currentCost;
526    double newCost;
527
528    if (!needsBalance(tableName, cluster)) {
529      return null;
530    }
531
532    long computedMaxSteps;
533    if (runMaxSteps) {
534      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
535    } else {
536      long calculatedMaxSteps = calculateMaxSteps(cluster);
537      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
538      if (calculatedMaxSteps > maxSteps) {
539        LOG.warn(
540          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
541            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
542            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
543            + "(This config change does not require service restart)",
544          calculatedMaxSteps, maxSteps);
545      }
546    }
547    LOG.info(
548      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
549        + "functionCost={} computedMaxSteps={}",
550      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
551
552    final String initFunctionTotalCosts = totalCostsPerFunc();
553    // Perform a stochastic walk to see if we can get a good fit.
554    long step;
555
556    for (step = 0; step < computedMaxSteps; step++) {
557      BalanceAction action = nextAction(cluster);
558
559      if (action.getType() == BalanceAction.Type.NULL) {
560        continue;
561      }
562
563      cluster.doAction(action);
564      updateCostsAndWeightsWithAction(cluster, action);
565
566      newCost = computeCost(cluster, currentCost);
567
568      // Should this be kept?
569      if (newCost < currentCost) {
570        currentCost = newCost;
571
572        // save for JMX
573        curOverallCost = currentCost;
574        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
575      } else {
576        // Put things back the way they were before.
577        // TODO: undo by remembering old values
578        BalanceAction undoAction = action.undoAction();
579        cluster.doAction(undoAction);
580        updateCostsAndWeightsWithAction(cluster, undoAction);
581      }
582
583      if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
584        break;
585      }
586    }
587    long endTime = EnvironmentEdgeManager.currentTime();
588
589    metricsBalancer.balanceCluster(endTime - startTime);
590
591    if (initCost > currentCost) {
592      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
593      plans = createRegionPlans(cluster);
594      LOG.info(
595        "Finished computing new moving plan. Computation took {} ms"
596          + " to try {} different iterations.  Found a solution that moves "
597          + "{} regions; Going from a computed imbalance of {}"
598          + " to a new imbalance of {}. funtionCost={}",
599        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
600        currentCost / sumMultiplier, functionCost());
601      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
602      return plans;
603    }
604    LOG.info(
605      "Could not find a better moving plan.  Tried {} different configurations in "
606        + "{} ms, and did not find anything with an imbalance score less than {}",
607      step, endTime - startTime, initCost / sumMultiplier);
608    return null;
609  }
610
611  private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions) {
612    if (this.isBalancerRejectionRecording) {
613      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason);
614      if (costFunctions != null) {
615        for (CostFunction c : costFunctions) {
616          if (!c.isNeeded()) {
617            continue;
618          }
619          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
620        }
621      }
622      namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
623    }
624  }
625
626  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
627    double initCost, String initFunctionTotalCosts, long step) {
628    if (this.isBalancerDecisionRecording) {
629      List<String> regionPlans = new ArrayList<>();
630      for (RegionPlan plan : plans) {
631        regionPlans
632          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
633            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
634      }
635      BalancerDecision balancerDecision = new BalancerDecision.Builder().setInitTotalCost(initCost)
636        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
637        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
638        .setRegionPlans(regionPlans).build();
639      namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
640    }
641  }
642
643  /**
644   * update costs to JMX
645   */
646  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
647    if (tableName == null) {
648      return;
649    }
650
651    // check if the metricsBalancer is MetricsStochasticBalancer before casting
652    if (metricsBalancer instanceof MetricsStochasticBalancer) {
653      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
654      // overall cost
655      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
656        "Overall cost", overall);
657
658      // each cost function
659      for (int i = 0; i < costFunctions.size(); i++) {
660        CostFunction costFunction = costFunctions.get(i);
661        String costFunctionName = costFunction.getClass().getSimpleName();
662        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
663        // TODO: cost function may need a specific description
664        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
665          "The percent of " + costFunctionName, costPercent);
666      }
667    }
668  }
669
670  private void addCostFunction(CostFunction costFunction) {
671    float multiplier = costFunction.getMultiplier();
672    if (multiplier > 0) {
673      costFunctions.add(costFunction);
674    }
675  }
676
677  private String functionCost() {
678    StringBuilder builder = new StringBuilder();
679    for (CostFunction c : costFunctions) {
680      builder.append(c.getClass().getSimpleName());
681      builder.append(" : (");
682      if (c.isNeeded()) {
683        builder.append("multiplier=" + c.getMultiplier());
684        builder.append(", ");
685        double cost = c.cost();
686        builder.append("imbalance=" + cost);
687        if (cost >= minCostNeedBalance) {
688          builder.append(", need balance");
689        }
690      } else {
691        builder.append("not needed");
692      }
693      builder.append("); ");
694    }
695    return builder.toString();
696  }
697
698  private String totalCostsPerFunc() {
699    StringBuilder builder = new StringBuilder();
700    for (CostFunction c : costFunctions) {
701      if (!c.isNeeded()) {
702        continue;
703      }
704      double cost = c.getMultiplier() * c.cost();
705      if (cost > 0.0) {
706        builder.append(" ");
707        builder.append(c.getClass().getSimpleName());
708        builder.append(" : ");
709        builder.append(cost);
710        builder.append(";");
711      }
712    }
713    if (builder.length() > 0) {
714      builder.deleteCharAt(builder.length() - 1);
715    }
716    return builder.toString();
717  }
718
719  /**
720   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
721   * state.
722   * @param cluster The state of the cluster
723   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
724   */
725  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
726    List<RegionPlan> plans = new ArrayList<>();
727    for (int regionIndex = 0; regionIndex
728        < cluster.regionIndexToServerIndex.length; regionIndex++) {
729      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
730      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
731
732      if (initialServerIndex != newServerIndex) {
733        RegionInfo region = cluster.regions[regionIndex];
734        ServerName initialServer = cluster.servers[initialServerIndex];
735        ServerName newServer = cluster.servers[newServerIndex];
736
737        if (LOG.isTraceEnabled()) {
738          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
739            + initialServer.getHostname() + " to " + newServer.getHostname());
740        }
741        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
742        plans.add(rp);
743      }
744    }
745    return plans;
746  }
747
748  /**
749   * Store the current region loads.
750   */
751  private void updateRegionLoad() {
752    // We create a new hashmap so that regions that are no longer there are removed.
753    // However we temporarily need the old loads so we can use them to keep the rolling average.
754    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
755    loads = new HashMap<>();
756
757    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
758      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
759        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
760        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
761        if (rLoads == null) {
762          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
763        } else if (rLoads.size() >= numRegionLoadsToRemember) {
764          rLoads.remove();
765        }
766        rLoads.add(new BalancerRegionLoad(rm));
767        loads.put(regionNameAsString, rLoads);
768      });
769    });
770  }
771
772  @RestrictedApi(explanation = "Should only be called in tests", link = "",
773      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
774  void initCosts(BalancerClusterState cluster) {
775    // Initialize the weights of generator every time
776    weightsOfGenerators = new double[this.candidateGenerators.size()];
777    for (CostFunction c : costFunctions) {
778      c.prepare(cluster);
779      c.updateWeight(weightsOfGenerators);
780    }
781  }
782
783  @RestrictedApi(explanation = "Should only be called in tests", link = "",
784      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
785  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
786    // Reset all the weights to 0
787    for (int i = 0; i < weightsOfGenerators.length; i++) {
788      weightsOfGenerators[i] = 0;
789    }
790    for (CostFunction c : costFunctions) {
791      if (c.isNeeded()) {
792        c.postAction(action);
793        c.updateWeight(weightsOfGenerators);
794      }
795    }
796  }
797
798  /**
799   * Get the names of the cost functions
800   */
801  @RestrictedApi(explanation = "Should only be called in tests", link = "",
802      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
803  String[] getCostFunctionNames() {
804    String[] ret = new String[costFunctions.size()];
805    for (int i = 0; i < costFunctions.size(); i++) {
806      CostFunction c = costFunctions.get(i);
807      ret[i] = c.getClass().getSimpleName();
808    }
809
810    return ret;
811  }
812
813  /**
814   * This is the main cost function. It will compute a cost associated with a proposed cluster
815   * state. All different costs will be combined with their multipliers to produce a double cost.
816   * @param cluster      The state of the cluster
817   * @param previousCost the previous cost. This is used as an early out.
818   * @return a double of a cost associated with the proposed cluster state. This cost is an
819   *         aggregate of all individual cost functions.
820   */
821  @RestrictedApi(explanation = "Should only be called in tests", link = "",
822      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
823  double computeCost(BalancerClusterState cluster, double previousCost) {
824    double total = 0;
825
826    for (int i = 0; i < costFunctions.size(); i++) {
827      CostFunction c = costFunctions.get(i);
828      this.tempFunctionCosts[i] = 0.0;
829
830      if (!c.isNeeded()) {
831        continue;
832      }
833
834      Float multiplier = c.getMultiplier();
835      double cost = c.cost();
836
837      this.tempFunctionCosts[i] = multiplier * cost;
838      total += this.tempFunctionCosts[i];
839
840      if (total > previousCost) {
841        break;
842      }
843    }
844
845    return total;
846  }
847
848  /**
849   * A helper function to compose the attribute name from tablename and costfunction name
850   */
851  static String composeAttributeName(String tableName, String costFunctionName) {
852    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
853  }
854}