001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.StringJoiner;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Supplier;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.RegionMetrics;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.BalancerDecision;
043import org.apache.hadoop.hbase.client.BalancerRejection;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RackManager;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.util.ReflectionUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
056
057/**
058 * <p>
059 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
060 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
061 * It includes costs functions to compute the cost of:
062 * </p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 * <p>
071 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
072 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
073 * scaled by their respective multipliers:
074 * </p>
075 * <ul>
076 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
077 * <li>hbase.master.balancer.stochastic.moveCost</li>
078 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
079 * <li>hbase.master.balancer.stochastic.localityCost</li>
080 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
081 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
082 * </ul>
083 * <p>
084 * You can also add custom Cost function by setting the the following configuration value:
085 * </p>
086 * <ul>
087 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
088 * </ul>
089 * <p>
090 * All custom Cost Functions needs to extends {@link CostFunction}
091 * </p>
092 * <p>
093 * In addition to the above configurations, the balancer can be tuned by the following configuration
094 * values:
095 * </p>
096 * <ul>
097 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
098 * that can be moved in a single invocation of this balancer.</li>
099 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
100 * regions is multiplied to try and get the number of times the balancer will mutate all
101 * servers.</li>
102 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
103 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
104 * the above computation.</li>
105 * </ul>
106 * <p>
107 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
108 * balancer gets the full picture of all loads on the cluster.
109 * </p>
110 */
111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
112public class StochasticLoadBalancer extends BaseLoadBalancer {
113
114  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
115
116  protected static final String STEPS_PER_REGION_KEY =
117    "hbase.master.balancer.stochastic.stepsPerRegion";
118  protected static final int DEFAULT_STEPS_PER_REGION = 800;
119  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
120  protected static final int DEFAULT_MAX_STEPS = 1000000;
121  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
122  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
123  protected static final String MAX_RUNNING_TIME_KEY =
124    "hbase.master.balancer.stochastic.maxRunningTime";
125  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
126  protected static final String KEEP_REGION_LOADS =
127    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
128  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
129  private static final String TABLE_FUNCTION_SEP = "_";
130  protected static final String MIN_COST_NEED_BALANCE_KEY =
131    "hbase.master.balancer.stochastic.minCostNeedBalance";
132  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
133  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
134    "hbase.master.balancer.stochastic.additionalCostFunctions";
135  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
136
137  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
138
139  // values are defaults
140  private int maxSteps = DEFAULT_MAX_STEPS;
141  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
142  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
143  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
144  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
145  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
146  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
147
148  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
149                                              // IS2_INCONSISTENT_SYNC
150  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
151  private float sumMultiplier;
152  // to save and report costs to JMX
153  private double curOverallCost = 0d;
154  private double[] tempFunctionCosts;
155  private double[] curFunctionCosts;
156
157  // Keep locality based picker and cost function to alert them
158  // when new services are offered
159  private LocalityBasedCandidateGenerator localityCandidateGenerator;
160  private ServerLocalityCostFunction localityCost;
161  private RackLocalityCostFunction rackLocalityCost;
162  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
163  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
164
165  private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
166    new HashMap<>();
167  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
168  protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
169    Suppliers.memoizeWithExpiration(() -> {
170      List<Class<? extends CandidateGenerator>> shuffled =
171        new ArrayList<>(candidateGenerators.keySet());
172      Collections.shuffle(shuffled);
173      return shuffled;
174    }, 5, TimeUnit.SECONDS);
175
176  /**
177   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
178   * default MetricsBalancer
179   */
180  public StochasticLoadBalancer() {
181    super(new MetricsStochasticBalancer());
182  }
183
184  @RestrictedApi(explanation = "Should only be called in tests", link = "",
185      allowedOnPath = ".*/src/test/.*")
186  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
187    super(metricsStochasticBalancer);
188  }
189
190  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
191    Configuration conf) {
192    try {
193      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
194      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
195    } catch (NoSuchMethodException e) {
196      // will try construct with no parameter
197    }
198    return ReflectionUtils.newInstance(clazz);
199  }
200
201  private void loadCustomCostFunctions(Configuration conf) {
202    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
203
204    if (null == functionsNames) {
205      return;
206    }
207    for (String className : functionsNames) {
208      Class<? extends CostFunction> clazz;
209      try {
210        clazz = Class.forName(className).asSubclass(CostFunction.class);
211      } catch (ClassNotFoundException e) {
212        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
213        continue;
214      }
215      CostFunction func = createCostFunction(clazz, conf);
216      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
217      costFunctions.add(func);
218    }
219  }
220
221  @RestrictedApi(explanation = "Should only be called in tests", link = "",
222      allowedOnPath = ".*/src/test/.*")
223  Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
224    return this.candidateGenerators;
225  }
226
227  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
228    createCandidateGenerators() {
229    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
230      new HashMap<>(5);
231    candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
232    candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
233    candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
234    candidateGenerators.put(RegionReplicaCandidateGenerator.class,
235      new RegionReplicaCandidateGenerator());
236    candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
237      new RegionReplicaRackCandidateGenerator());
238    return candidateGenerators;
239  }
240
241  protected List<CostFunction> createCostFunctions(Configuration conf) {
242    List<CostFunction> costFunctions = new ArrayList<>();
243    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
244    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
245    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
246    addCostFunction(costFunctions, localityCost);
247    addCostFunction(costFunctions, rackLocalityCost);
248    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
249    addCostFunction(costFunctions, regionReplicaHostCostFunction);
250    addCostFunction(costFunctions, regionReplicaRackCostFunction);
251    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
252    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
253    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
254    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
255    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
256    return costFunctions;
257  }
258
259  @Override
260  protected void loadConf(Configuration conf) {
261    super.loadConf(conf);
262    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
263    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
264    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
265    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
266
267    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
268    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
269    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
270    localityCost = new ServerLocalityCostFunction(conf);
271    rackLocalityCost = new RackLocalityCostFunction(conf);
272
273    this.candidateGenerators = createCandidateGenerators();
274
275    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
276    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
277    this.costFunctions = createCostFunctions(conf);
278    loadCustomCostFunctions(conf);
279
280    curFunctionCosts = new double[costFunctions.size()];
281    tempFunctionCosts = new double[costFunctions.size()];
282
283    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
284      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
285      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
286      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
287  }
288
289  @Override
290  public void updateClusterMetrics(ClusterMetrics st) {
291    super.updateClusterMetrics(st);
292    updateRegionLoad();
293
294    // update metrics size
295    try {
296      // by-table or ensemble mode
297      int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
298      int functionsCount = getCostFunctionNames().length;
299
300      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
301    } catch (Exception e) {
302      LOG.error("failed to get the size of all tables", e);
303    }
304  }
305
306  private void updateBalancerTableLoadInfo(TableName tableName,
307    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
308    RegionHDFSBlockLocationFinder finder = null;
309    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
310      finder = this.regionFinder;
311    }
312    BalancerClusterState cluster =
313      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
314
315    initCosts(cluster);
316    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
317    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
318    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
319  }
320
321  @Override
322  public void
323    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
324    if (isByTable) {
325      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
326        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
327      });
328    } else {
329      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
330        toEnsumbleTableLoad(loadOfAllTable));
331    }
332  }
333
334  /**
335   * Update the number of metrics that are reported to JMX
336   */
337  @RestrictedApi(explanation = "Should only be called in tests", link = "",
338      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
339  void updateMetricsSize(int size) {
340    if (metricsBalancer instanceof MetricsStochasticBalancer) {
341      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
342    }
343  }
344
345  private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) {
346    if (c.numHosts >= c.maxReplicas) {
347      regionReplicaHostCostFunction.prepare(c);
348      double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
349      boolean colocatedAtHost = hostCost > CostFunction.getCostEpsilon(hostCost);
350      if (colocatedAtHost) {
351        return true;
352      }
353      LOG.trace("No host colocation detected with host cost={}", hostCost);
354    }
355    return false;
356  }
357
358  private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) {
359    if (c.numRacks >= c.maxReplicas) {
360      regionReplicaRackCostFunction.prepare(c);
361      double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
362      boolean colocatedAtRack = rackCost > CostFunction.getCostEpsilon(rackCost);
363      if (colocatedAtRack) {
364        return true;
365      }
366      LOG.trace("No rack colocation detected with rack cost={}", rackCost);
367    } else {
368      LOG.trace("Rack colocation is inevitable with fewer racks than replicas, "
369        + "so we won't bother checking");
370    }
371    return false;
372  }
373
374  private String getBalanceReason(double total, double sumMultiplier) {
375    if (total <= 0) {
376      return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
377    } else if (sumMultiplier <= 0) {
378      return "sumMultiplier = " + sumMultiplier + " <= 0";
379    } else if ((total / sumMultiplier) < minCostNeedBalance) {
380      return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
381        + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
382    } else {
383      return "";
384    }
385  }
386
387  @RestrictedApi(explanation = "Should only be called in tests", link = "",
388      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
389  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
390    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
391    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
392      LOG.info(
393        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
394      sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers()
395        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
396      return false;
397    }
398
399    if (areSomeRegionReplicasColocatedOnHost(cluster)) {
400      LOG.info("Running balancer because at least one server hosts replicas of the same region."
401        + " function cost={}", functionCost());
402      return true;
403    }
404
405    if (areSomeRegionReplicasColocatedOnRack(cluster)) {
406      LOG.info("Running balancer because at least one rack hosts replicas of the same region."
407        + " function cost={}", functionCost());
408      return true;
409    }
410
411    if (idleRegionServerExist(cluster)) {
412      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
413        functionCost());
414      return true;
415    }
416
417    if (sloppyRegionServerExist(cs)) {
418      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
419        functionCost());
420      return true;
421    }
422
423    double total = 0.0;
424    float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized
425    for (CostFunction c : costFunctions) {
426      if (!c.isNeeded()) {
427        LOG.trace("{} not needed", c.getClass().getSimpleName());
428        continue;
429      }
430      total += c.cost() * c.getMultiplier();
431      localSumMultiplier += c.getMultiplier();
432    }
433    sumMultiplier = localSumMultiplier;
434    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
435
436    if (balanced) {
437      final double calculatedTotal = total;
438      sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
439        costFunctions);
440      LOG.info(
441        "{} - skipping load balancing because weighted average imbalance={} <= "
442          + "threshold({}). If you want more aggressive balancing, either lower "
443          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
444          + "multiplier(s) of the specific cost function(s). functionCost={}",
445        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
446        minCostNeedBalance, minCostNeedBalance, functionCost());
447    } else {
448      LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
449        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime);
450    }
451    return !balanced;
452  }
453
454  @RestrictedApi(explanation = "Should only be called in tests", link = "",
455      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
456  Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
457    CandidateGenerator generator = getRandomGenerator();
458    return Pair.newPair(generator, generator.generate(cluster));
459  }
460
461  /**
462   * Select the candidate generator to use based on the cost of cost functions. The chance of
463   * selecting a candidate generator is proportional to the share of cost of all cost functions
464   * among all cost functions that benefit from it.
465   */
466  protected CandidateGenerator getRandomGenerator() {
467    Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available.");
468    List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
469    List<Double> partialSums = new ArrayList<>(generatorClasses.size());
470    double sum = 0.0;
471    for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
472      double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
473      sum += weight;
474      partialSums.add(sum);
475    }
476
477    // If the sum of all weights is zero, fall back to any generator
478    if (sum == 0.0) {
479      return pickAnyGenerator(generatorClasses);
480    }
481
482    double rand = ThreadLocalRandom.current().nextDouble();
483    // Normalize partial sums so that the last one should be exactly 1.0
484    for (int i = 0; i < partialSums.size(); i++) {
485      partialSums.set(i, partialSums.get(i) / sum);
486    }
487
488    // Generate a random number and pick the first generator whose partial sum is >= rand
489    for (int i = 0; i < partialSums.size(); i++) {
490      if (rand <= partialSums.get(i)) {
491        return candidateGenerators.get(generatorClasses.get(i));
492      }
493    }
494
495    // Fallback: if for some reason we didn't return above, return any generator
496    return pickAnyGenerator(generatorClasses);
497  }
498
499  private CandidateGenerator
500    pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
501    Class<? extends CandidateGenerator> randomClass =
502      generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
503    return candidateGenerators.get(randomClass);
504  }
505
506  @RestrictedApi(explanation = "Should only be called in tests", link = "",
507      allowedOnPath = ".*/src/test/.*")
508  void setRackManager(RackManager rackManager) {
509    this.rackManager = rackManager;
510  }
511
512  private long calculateMaxSteps(BalancerClusterState cluster) {
513    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
514  }
515
516  /**
517   * Given the cluster state this will try and approach an optimal balance. This should always
518   * approach the optimal state given enough steps.
519   */
520  @Override
521  protected List<RegionPlan> balanceTable(TableName tableName,
522    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
523    // On clusters with lots of HFileLinks or lots of reference files,
524    // instantiating the storefile infos can be quite expensive.
525    // Allow turning this feature off if the locality cost is not going to
526    // be used in any computations.
527    RegionHDFSBlockLocationFinder finder = null;
528    if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
529      finder = this.regionFinder;
530    }
531
532    // The clusterState that is given to this method contains the state
533    // of all the regions in the table(s) (that's true today)
534    // Keep track of servers to iterate through them.
535    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
536      rackManager, regionCacheRatioOnOldServerMap);
537
538    long startTime = EnvironmentEdgeManager.currentTime();
539
540    initCosts(cluster);
541
542    float localSumMultiplier = 0;
543    for (CostFunction c : costFunctions) {
544      if (c.isNeeded()) {
545        localSumMultiplier += c.getMultiplier();
546      }
547    }
548    sumMultiplier = localSumMultiplier;
549    if (sumMultiplier <= 0) {
550      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
551        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
552      return null;
553    }
554
555    double currentCost = computeCost(cluster, Double.MAX_VALUE);
556    curOverallCost = currentCost;
557    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
558    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
559    double initCost = currentCost;
560    double newCost;
561
562    if (!needsBalance(tableName, cluster)) {
563      return null;
564    }
565
566    long computedMaxSteps;
567    if (runMaxSteps) {
568      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
569    } else {
570      long calculatedMaxSteps = calculateMaxSteps(cluster);
571      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
572      if (calculatedMaxSteps > maxSteps) {
573        LOG.warn(
574          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
575            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
576            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
577            + "(This config change does not require service restart)",
578          calculatedMaxSteps, maxSteps);
579      }
580    }
581    LOG.info(
582      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
583        + "functionCost={} computedMaxSteps={}",
584      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
585
586    final String initFunctionTotalCosts = totalCostsPerFunc();
587    // Perform a stochastic walk to see if we can get a good fit.
588    long step;
589    Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
590    Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
591    for (step = 0; step < computedMaxSteps; step++) {
592      Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
593      CandidateGenerator generator = nextAction.getFirst();
594      BalanceAction action = nextAction.getSecond();
595
596      if (action.getType() == BalanceAction.Type.NULL) {
597        continue;
598      }
599
600      cluster.doAction(action);
601      updateCostsAndWeightsWithAction(cluster, action);
602      generatorToStepCount.merge(generator.getClass(), 1L, Long::sum);
603
604      newCost = computeCost(cluster, currentCost);
605
606      // Should this be kept?
607      if (newCost < currentCost) {
608        currentCost = newCost;
609        generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum);
610
611        // save for JMX
612        curOverallCost = currentCost;
613        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
614      } else {
615        // Put things back the way they were before.
616        // TODO: undo by remembering old values
617        BalanceAction undoAction = action.undoAction();
618        cluster.doAction(undoAction);
619        updateCostsAndWeightsWithAction(cluster, undoAction);
620      }
621
622      if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
623        break;
624      }
625    }
626    long endTime = EnvironmentEdgeManager.currentTime();
627
628    StringJoiner joiner = new StringJoiner("\n");
629    joiner.add("CandidateGenerator activity summary:");
630    generatorToStepCount.forEach((generator, count) -> {
631      long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
632      joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
633        approvals));
634    });
635    LOG.debug(joiner.toString());
636
637    metricsBalancer.balanceCluster(endTime - startTime);
638
639    if (initCost > currentCost) {
640      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
641      List<RegionPlan> plans = createRegionPlans(cluster);
642      LOG.info(
643        "Finished computing new moving plan. Computation took {} ms"
644          + " to try {} different iterations.  Found a solution that moves "
645          + "{} regions; Going from a computed imbalance of {}"
646          + " to a new imbalance of {}. funtionCost={}",
647        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
648        currentCost / sumMultiplier, functionCost());
649      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
650      return plans;
651    }
652    LOG.info(
653      "Could not find a better moving plan.  Tried {} different configurations in "
654        + "{} ms, and did not find anything with an imbalance score less than {}",
655      step, endTime - startTime, initCost / sumMultiplier);
656    return null;
657  }
658
659  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
660    List<CostFunction> costFunctions) {
661    provider.recordBalancerRejection(() -> {
662      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
663      if (costFunctions != null) {
664        for (CostFunction c : costFunctions) {
665          if (!c.isNeeded()) {
666            continue;
667          }
668          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
669        }
670      }
671      return builder.build();
672    });
673  }
674
675  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
676    double initCost, String initFunctionTotalCosts, long step) {
677    provider.recordBalancerDecision(() -> {
678      List<String> regionPlans = new ArrayList<>();
679      for (RegionPlan plan : plans) {
680        regionPlans
681          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
682            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
683      }
684      return new BalancerDecision.Builder().setInitTotalCost(initCost)
685        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
686        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
687        .setRegionPlans(regionPlans).build();
688    });
689  }
690
691  /**
692   * update costs to JMX
693   */
694  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
695    if (tableName == null) {
696      return;
697    }
698
699    // check if the metricsBalancer is MetricsStochasticBalancer before casting
700    if (metricsBalancer instanceof MetricsStochasticBalancer) {
701      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
702      // overall cost
703      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
704        "Overall cost", overall);
705
706      // each cost function
707      for (int i = 0; i < costFunctions.size(); i++) {
708        CostFunction costFunction = costFunctions.get(i);
709        String costFunctionName = costFunction.getClass().getSimpleName();
710        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
711        // TODO: cost function may need a specific description
712        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
713          "The percent of " + costFunctionName, costPercent);
714      }
715    }
716  }
717
718  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
719    float multiplier = costFunction.getMultiplier();
720    if (multiplier > 0) {
721      costFunctions.add(costFunction);
722    }
723  }
724
725  protected String functionCost() {
726    StringBuilder builder = new StringBuilder();
727    for (CostFunction c : costFunctions) {
728      builder.append(c.getClass().getSimpleName());
729      builder.append(" : (");
730      if (c.isNeeded()) {
731        builder.append("multiplier=" + c.getMultiplier());
732        builder.append(", ");
733        double cost = c.cost();
734        builder.append("imbalance=" + cost);
735        if (cost >= minCostNeedBalance) {
736          builder.append(", need balance");
737        }
738      } else {
739        builder.append("not needed");
740      }
741      builder.append("); ");
742    }
743    return builder.toString();
744  }
745
746  @RestrictedApi(explanation = "Should only be called in tests", link = "",
747      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
748  List<CostFunction> getCostFunctions() {
749    return costFunctions;
750  }
751
752  private String totalCostsPerFunc() {
753    StringBuilder builder = new StringBuilder();
754    for (CostFunction c : costFunctions) {
755      if (!c.isNeeded()) {
756        continue;
757      }
758      double cost = c.getMultiplier() * c.cost();
759      if (cost > 0.0) {
760        builder.append(" ");
761        builder.append(c.getClass().getSimpleName());
762        builder.append(" : ");
763        builder.append(cost);
764        builder.append(";");
765      }
766    }
767    if (builder.length() > 0) {
768      builder.deleteCharAt(builder.length() - 1);
769    }
770    return builder.toString();
771  }
772
773  /**
774   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
775   * state.
776   * @param cluster The state of the cluster
777   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
778   */
779  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
780    List<RegionPlan> plans = new ArrayList<>();
781    for (int regionIndex = 0; regionIndex
782        < cluster.regionIndexToServerIndex.length; regionIndex++) {
783      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
784      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
785
786      if (initialServerIndex != newServerIndex) {
787        RegionInfo region = cluster.regions[regionIndex];
788        ServerName initialServer = cluster.servers[initialServerIndex];
789        ServerName newServer = cluster.servers[newServerIndex];
790
791        if (LOG.isTraceEnabled()) {
792          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
793            + initialServer.getHostname() + " to " + newServer.getHostname());
794        }
795        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
796        plans.add(rp);
797      }
798    }
799    return plans;
800  }
801
802  /**
803   * Store the current region loads.
804   */
805  private void updateRegionLoad() {
806    // We create a new hashmap so that regions that are no longer there are removed.
807    // However we temporarily need the old loads so we can use them to keep the rolling average.
808    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
809    loads = new HashMap<>();
810
811    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
812      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
813        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
814        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
815        if (rLoads == null) {
816          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
817        } else if (rLoads.size() >= numRegionLoadsToRemember) {
818          rLoads.remove();
819        }
820        rLoads.add(new BalancerRegionLoad(rm));
821        loads.put(regionNameAsString, rLoads);
822      });
823    });
824  }
825
826  @RestrictedApi(explanation = "Should only be called in tests", link = "",
827      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
828  void initCosts(BalancerClusterState cluster) {
829    weightsOfGenerators.clear();
830    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
831      weightsOfGenerators.put(clazz, 0.0);
832    }
833    for (CostFunction c : costFunctions) {
834      c.prepare(cluster);
835      c.updateWeight(weightsOfGenerators);
836    }
837  }
838
839  /**
840   * Update both the costs of costfunctions and the weights of candidate generators
841   */
842  @RestrictedApi(explanation = "Should only be called in tests", link = "",
843      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
844  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
845    // Reset all the weights to 0
846    for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
847      weightsOfGenerators.put(clazz, 0.0);
848    }
849    for (CostFunction c : costFunctions) {
850      if (c.isNeeded()) {
851        c.postAction(action);
852        c.updateWeight(weightsOfGenerators);
853      }
854    }
855  }
856
857  /**
858   * Get the names of the cost functions
859   */
860  @RestrictedApi(explanation = "Should only be called in tests", link = "",
861      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
862  String[] getCostFunctionNames() {
863    String[] ret = new String[costFunctions.size()];
864    for (int i = 0; i < costFunctions.size(); i++) {
865      CostFunction c = costFunctions.get(i);
866      ret[i] = c.getClass().getSimpleName();
867    }
868
869    return ret;
870  }
871
872  /**
873   * This is the main cost function. It will compute a cost associated with a proposed cluster
874   * state. All different costs will be combined with their multipliers to produce a double cost.
875   * @param cluster      The state of the cluster
876   * @param previousCost the previous cost. This is used as an early out.
877   * @return a double of a cost associated with the proposed cluster state. This cost is an
878   *         aggregate of all individual cost functions.
879   */
880  @RestrictedApi(explanation = "Should only be called in tests", link = "",
881      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
882  double computeCost(BalancerClusterState cluster, double previousCost) {
883    double total = 0;
884
885    for (int i = 0; i < costFunctions.size(); i++) {
886      CostFunction c = costFunctions.get(i);
887      this.tempFunctionCosts[i] = 0.0;
888
889      if (!c.isNeeded()) {
890        continue;
891      }
892
893      Float multiplier = c.getMultiplier();
894      double cost = c.cost();
895
896      this.tempFunctionCosts[i] = multiplier * cost;
897      total += this.tempFunctionCosts[i];
898
899      if (total > previousCost) {
900        break;
901      }
902    }
903
904    return total;
905  }
906
907  /**
908   * A helper function to compose the attribute name from tablename and costfunction name
909   */
910  static String composeAttributeName(String tableName, String costFunctionName) {
911    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
912  }
913}