001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Deque;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Random;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.ReflectionUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058
059/**
060 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
061 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 *
071 *
072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
073 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
074 * scaled by their respective multipliers:</p>
075 *
076 * <ul>
077 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
078 *   <li>hbase.master.balancer.stochastic.moveCost</li>
079 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.localityCost</li>
081 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
082 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
083 * </ul>
084 *
085 * <p>You can also add custom Cost function by setting the the following configuration value:</p>
086 * <ul>
087 *     <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
088 * </ul>
089 *
090 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
091 *
092 * <p>In addition to the above configurations, the balancer can be tuned by the following
093 * configuration values:</p>
094 * <ul>
095 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
096 *   controls what the max number of regions that can be moved in a single invocation of this
097 *   balancer.</li>
098 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 *   regions is multiplied to try and get the number of times the balancer will
100 *   mutate all servers.</li>
101 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
102 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
103 *   value and the above computation.</li>
104 * </ul>
105 *
106 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
107 * so that the balancer gets the full picture of all loads on the cluster.</p>
108 */
109@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
110@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
111  justification="Complaint is about costFunctions not being synchronized; not end of the world")
112public class StochasticLoadBalancer extends BaseLoadBalancer {
113
114  protected static final String STEPS_PER_REGION_KEY =
115      "hbase.master.balancer.stochastic.stepsPerRegion";
116  protected static final String MAX_STEPS_KEY =
117      "hbase.master.balancer.stochastic.maxSteps";
118  protected static final String RUN_MAX_STEPS_KEY =
119      "hbase.master.balancer.stochastic.runMaxSteps";
120  protected static final String MAX_RUNNING_TIME_KEY =
121      "hbase.master.balancer.stochastic.maxRunningTime";
122  protected static final String KEEP_REGION_LOADS =
123      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
124  private static final String TABLE_FUNCTION_SEP = "_";
125  protected static final String MIN_COST_NEED_BALANCE_KEY =
126      "hbase.master.balancer.stochastic.minCostNeedBalance";
127  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
128          "hbase.master.balancer.stochastic.additionalCostFunctions";
129
130  protected static final Random RANDOM = new Random(System.currentTimeMillis());
131  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
132
133  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
134
135  // values are defaults
136  private int maxSteps = 1000000;
137  private boolean runMaxSteps = false;
138  private int stepsPerRegion = 800;
139  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
140  private int numRegionLoadsToRemember = 15;
141  private float minCostNeedBalance = 0.05f;
142
143  private List<CandidateGenerator> candidateGenerators;
144  private CostFromRegionLoadFunction[] regionLoadFunctions;
145  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
146
147  // to save and report costs to JMX
148  private Double curOverallCost = 0d;
149  private Double[] tempFunctionCosts;
150  private Double[] curFunctionCosts;
151
152  // Keep locality based picker and cost function to alert them
153  // when new services are offered
154  private LocalityBasedCandidateGenerator localityCandidateGenerator;
155  private ServerLocalityCostFunction localityCost;
156  private RackLocalityCostFunction rackLocalityCost;
157  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
158  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
159
160  /**
161   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
162   * default MetricsBalancer
163   */
164  public StochasticLoadBalancer() {
165    super(new MetricsStochasticBalancer());
166  }
167
168  @Override
169  public void onConfigurationChange(Configuration conf) {
170    setConf(conf);
171  }
172
173  @Override
174  public synchronized void setConf(Configuration conf) {
175    super.setConf(conf);
176    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
177    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
178    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
179    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
180
181    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
182    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
183    if (localityCandidateGenerator == null) {
184      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
185    }
186    localityCost = new ServerLocalityCostFunction(conf, services);
187    rackLocalityCost = new RackLocalityCostFunction(conf, services);
188
189    if (this.candidateGenerators == null) {
190      candidateGenerators = Lists.newArrayList();
191      candidateGenerators.add(new RandomCandidateGenerator());
192      candidateGenerators.add(new LoadCandidateGenerator());
193      candidateGenerators.add(localityCandidateGenerator);
194      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
195    }
196    regionLoadFunctions = new CostFromRegionLoadFunction[] {
197      new ReadRequestCostFunction(conf),
198      new WriteRequestCostFunction(conf),
199      new MemStoreSizeCostFunction(conf),
200      new StoreFileCostFunction(conf)
201    };
202    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
203    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
204
205    costFunctions = new ArrayList<>();
206    costFunctions.add(new RegionCountSkewCostFunction(conf));
207    costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
208    costFunctions.add(new MoveCostFunction(conf));
209    costFunctions.add(localityCost);
210    costFunctions.add(rackLocalityCost);
211    costFunctions.add(new TableSkewCostFunction(conf));
212    costFunctions.add(regionReplicaHostCostFunction);
213    costFunctions.add(regionReplicaRackCostFunction);
214    costFunctions.add(regionLoadFunctions[0]);
215    costFunctions.add(regionLoadFunctions[1]);
216    costFunctions.add(regionLoadFunctions[2]);
217    costFunctions.add(regionLoadFunctions[3]);
218    loadCustomCostFunctions(conf);
219
220    curFunctionCosts = new Double[costFunctions.size()];
221    tempFunctionCosts = new Double[costFunctions.size()];
222
223    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
224            ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
225            Arrays.toString(getCostFunctionNames()) + " etc.");
226  }
227
228  private void loadCustomCostFunctions(Configuration conf) {
229    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
230
231    if (null == functionsNames) {
232      return;
233    }
234
235    costFunctions.addAll(Arrays.stream(functionsNames)
236            .map(c -> {
237              Class<? extends CostFunction> klass = null;
238              try {
239                klass = (Class<? extends CostFunction>) Class.forName(c);
240              } catch (ClassNotFoundException e) {
241                LOG.warn("Cannot load class " + c + "': " + e.getMessage());
242              }
243              if (null == klass) {
244                return null;
245              }
246
247              CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
248              LOG.info("Successfully loaded custom CostFunction '" +
249                      reflected.getClass().getSimpleName() + "'");
250
251              return reflected;
252            })
253            .filter(Objects::nonNull)
254            .collect(Collectors.toList()));
255  }
256
257  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
258    this.candidateGenerators = customCandidateGenerators;
259  }
260
261  @Override
262  protected void setSlop(Configuration conf) {
263    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
264  }
265
266  @Override
267  public synchronized void setClusterMetrics(ClusterMetrics st) {
268    super.setClusterMetrics(st);
269    updateRegionLoad();
270    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
271      cost.setClusterMetrics(st);
272    }
273
274    // update metrics size
275    try {
276      // by-table or ensemble mode
277      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
278      int functionsCount = getCostFunctionNames().length;
279
280      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
281    } catch (Exception e) {
282      LOG.error("failed to get the size of all tables", e);
283    }
284  }
285
286  /**
287   * Update the number of metrics that are reported to JMX
288   */
289  public void updateMetricsSize(int size) {
290    if (metricsBalancer instanceof MetricsStochasticBalancer) {
291        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
292    }
293  }
294
295  @Override
296  public synchronized void setMasterServices(MasterServices masterServices) {
297    super.setMasterServices(masterServices);
298    this.localityCost.setServices(masterServices);
299    this.rackLocalityCost.setServices(masterServices);
300    this.localityCandidateGenerator.setServices(masterServices);
301  }
302
303  @Override
304  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
305    regionReplicaHostCostFunction.init(c);
306    if (regionReplicaHostCostFunction.cost() > 0) return true;
307    regionReplicaRackCostFunction.init(c);
308    if (regionReplicaRackCostFunction.cost() > 0) return true;
309    return false;
310  }
311
312  @Override
313  protected boolean needsBalance(TableName tableName, Cluster cluster) {
314    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
315    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
316      if (LOG.isDebugEnabled()) {
317        LOG.debug("Not running balancer because only " + cs.getNumServers()
318            + " active regionserver(s)");
319      }
320      return false;
321    }
322    if (areSomeRegionReplicasColocated(cluster)) {
323      return true;
324    }
325
326    if (idleRegionServerExist(cluster)){
327      return true;
328    }
329
330    double total = 0.0;
331    float sumMultiplier = 0.0f;
332    for (CostFunction c : costFunctions) {
333      float multiplier = c.getMultiplier();
334      if (multiplier <= 0) {
335        LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
336        continue;
337      }
338      if (!c.isNeeded()) {
339        LOG.trace("{} not needed", c.getClass().getSimpleName());
340        continue;
341      }
342      sumMultiplier += multiplier;
343      total += c.cost() * multiplier;
344    }
345
346    boolean balanced = total <= 0 || sumMultiplier <= 0 ||
347        (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
348    if (LOG.isDebugEnabled()) {
349      LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
350          balanced ? "Skipping load balancing because balanced" : "We need to load balance",
351          isByTable ? String.format("table (%s)", tableName) : "cluster",
352          total, sumMultiplier, minCostNeedBalance);
353      if (LOG.isTraceEnabled()) {
354        LOG.trace("Balance decision detailed function costs={}", functionCost());
355      }
356    }
357    return !balanced;
358  }
359
360  @VisibleForTesting
361  Cluster.Action nextAction(Cluster cluster) {
362    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
363            .generate(cluster);
364  }
365
366  /**
367   * Given the cluster state this will try and approach an optimal balance. This
368   * should always approach the optimal state given enough steps.
369   */
370  @Override
371  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
372    List<RegionInfo>> loadOfOneTable) {
373    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
374    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
375      return plans;
376    }
377
378    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
379      if (loadOfOneTable.size() <= 2) {
380        return null;
381      }
382      loadOfOneTable = new HashMap<>(loadOfOneTable);
383      loadOfOneTable.remove(masterServerName);
384    }
385
386    // On clusters with lots of HFileLinks or lots of reference files,
387    // instantiating the storefile infos can be quite expensive.
388    // Allow turning this feature off if the locality cost is not going to
389    // be used in any computations.
390    RegionLocationFinder finder = null;
391    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
392        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
393      finder = this.regionFinder;
394    }
395
396    //The clusterState that is given to this method contains the state
397    //of all the regions in the table(s) (that's true today)
398    // Keep track of servers to iterate through them.
399    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
400
401    long startTime = EnvironmentEdgeManager.currentTime();
402
403    initCosts(cluster);
404
405    if (!needsBalance(tableName, cluster)) {
406      return null;
407    }
408
409    double currentCost = computeCost(cluster, Double.MAX_VALUE);
410    curOverallCost = currentCost;
411    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
412    double initCost = currentCost;
413    double newCost = currentCost;
414
415    long computedMaxSteps;
416    if (runMaxSteps) {
417      computedMaxSteps = Math.max(this.maxSteps,
418          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
419    } else {
420      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
421          (long)cluster.numServers;
422      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
423      if (calculatedMaxSteps > maxSteps) {
424        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
425            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
426            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
427            + "(This config change does not require service restart)", calculatedMaxSteps,
428            maxSteps);
429      }
430    }
431    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
432        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
433
434    // Perform a stochastic walk to see if we can get a good fit.
435    long step;
436
437    for (step = 0; step < computedMaxSteps; step++) {
438      Cluster.Action action = nextAction(cluster);
439
440      if (action.type == Type.NULL) {
441        continue;
442      }
443
444      cluster.doAction(action);
445      updateCostsWithAction(cluster, action);
446
447      newCost = computeCost(cluster, currentCost);
448
449      // Should this be kept?
450      if (newCost < currentCost) {
451        currentCost = newCost;
452
453        // save for JMX
454        curOverallCost = currentCost;
455        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
456      } else {
457        // Put things back the way they were before.
458        // TODO: undo by remembering old values
459        Action undoAction = action.undoAction();
460        cluster.doAction(undoAction);
461        updateCostsWithAction(cluster, undoAction);
462      }
463
464      if (EnvironmentEdgeManager.currentTime() - startTime >
465          maxRunningTime) {
466        break;
467      }
468    }
469    long endTime = EnvironmentEdgeManager.currentTime();
470
471    metricsBalancer.balanceCluster(endTime - startTime);
472
473    // update costs metrics
474    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
475    if (initCost > currentCost) {
476      plans = createRegionPlans(cluster);
477      LOG.info("Finished computing new load balance plan. Computation took {}" +
478        " to try {} different iterations.  Found a solution that moves " +
479        "{} regions; Going from a computed cost of {}" +
480        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
481        step, plans.size(), initCost, currentCost);
482      return plans;
483    }
484    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
485      "{}, and did not find anything with a computed cost less than {}", step,
486      java.time.Duration.ofMillis(endTime - startTime), initCost);
487    return null;
488  }
489
490  /**
491   * update costs to JMX
492   */
493  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
494    if (tableName == null) return;
495
496    // check if the metricsBalancer is MetricsStochasticBalancer before casting
497    if (metricsBalancer instanceof MetricsStochasticBalancer) {
498      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
499      // overall cost
500      balancer.updateStochasticCost(tableName.getNameAsString(),
501        "Overall", "Overall cost", overall);
502
503      // each cost function
504      for (int i = 0; i < costFunctions.size(); i++) {
505        CostFunction costFunction = costFunctions.get(i);
506        String costFunctionName = costFunction.getClass().getSimpleName();
507        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
508        // TODO: cost function may need a specific description
509        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
510          "The percent of " + costFunctionName, costPercent);
511      }
512    }
513  }
514
515  private String functionCost() {
516    StringBuilder builder = new StringBuilder();
517    for (CostFunction c:costFunctions) {
518      builder.append(c.getClass().getSimpleName());
519      builder.append(" : (");
520      builder.append(c.getMultiplier());
521      builder.append(", ");
522      builder.append(c.cost());
523      builder.append("); ");
524    }
525    return builder.toString();
526  }
527
528  /**
529   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
530   * state.
531   *
532   * @param cluster The state of the cluster
533   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
534   */
535  private List<RegionPlan> createRegionPlans(Cluster cluster) {
536    List<RegionPlan> plans = new LinkedList<>();
537    for (int regionIndex = 0;
538         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
539      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
540      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
541
542      if (initialServerIndex != newServerIndex) {
543        RegionInfo region = cluster.regions[regionIndex];
544        ServerName initialServer = cluster.servers[initialServerIndex];
545        ServerName newServer = cluster.servers[newServerIndex];
546
547        if (LOG.isTraceEnabled()) {
548          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
549              + initialServer.getHostname() + " to " + newServer.getHostname());
550        }
551        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
552        plans.add(rp);
553      }
554    }
555    return plans;
556  }
557
558  /**
559   * Store the current region loads.
560   */
561  private synchronized void updateRegionLoad() {
562    // We create a new hashmap so that regions that are no longer there are removed.
563    // However we temporarily need the old loads so we can use them to keep the rolling average.
564    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
565    loads = new HashMap<>();
566
567    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
568      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
569        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
570        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
571        if (rLoads == null) {
572          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
573        } else if (rLoads.size() >= numRegionLoadsToRemember) {
574          rLoads.remove();
575        }
576        rLoads.add(new BalancerRegionLoad(rm));
577        loads.put(regionNameAsString, rLoads);
578      });
579    });
580
581    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
582      cost.setLoads(loads);
583    }
584  }
585
586  protected void initCosts(Cluster cluster) {
587    for (CostFunction c:costFunctions) {
588      c.init(cluster);
589    }
590  }
591
592  protected void updateCostsWithAction(Cluster cluster, Action action) {
593    for (CostFunction c : costFunctions) {
594      c.postAction(action);
595    }
596  }
597
598  /**
599   * Get the names of the cost functions
600   */
601  public String[] getCostFunctionNames() {
602    if (costFunctions == null) return null;
603    String[] ret = new String[costFunctions.size()];
604    for (int i = 0; i < costFunctions.size(); i++) {
605      CostFunction c = costFunctions.get(i);
606      ret[i] = c.getClass().getSimpleName();
607    }
608
609    return ret;
610  }
611
612  /**
613   * This is the main cost function.  It will compute a cost associated with a proposed cluster
614   * state.  All different costs will be combined with their multipliers to produce a double cost.
615   *
616   * @param cluster The state of the cluster
617   * @param previousCost the previous cost. This is used as an early out.
618   * @return a double of a cost associated with the proposed cluster state.  This cost is an
619   *         aggregate of all individual cost functions.
620   */
621  protected double computeCost(Cluster cluster, double previousCost) {
622    double total = 0;
623
624    for (int i = 0; i < costFunctions.size(); i++) {
625      CostFunction c = costFunctions.get(i);
626      this.tempFunctionCosts[i] = 0.0;
627
628      if (c.getMultiplier() <= 0) {
629        continue;
630      }
631
632      Float multiplier = c.getMultiplier();
633      Double cost = c.cost();
634
635      this.tempFunctionCosts[i] = multiplier*cost;
636      total += this.tempFunctionCosts[i];
637
638      if (total > previousCost) {
639        break;
640      }
641    }
642
643    return total;
644  }
645
646  static class RandomCandidateGenerator extends CandidateGenerator {
647
648    @Override
649    Cluster.Action generate(Cluster cluster) {
650
651      int thisServer = pickRandomServer(cluster);
652
653      // Pick the other server
654      int otherServer = pickOtherRandomServer(cluster, thisServer);
655
656      return pickRandomRegions(cluster, thisServer, otherServer);
657    }
658  }
659
660  /**
661   * Generates candidates which moves the replicas out of the rack for
662   * co-hosted region replicas in the same rack
663   */
664  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
665    @Override
666    Cluster.Action generate(Cluster cluster) {
667      int rackIndex = pickRandomRack(cluster);
668      if (cluster.numRacks <= 1 || rackIndex == -1) {
669        return super.generate(cluster);
670      }
671
672      int regionIndex = selectCoHostedRegionPerGroup(
673        cluster.primariesOfRegionsPerRack[rackIndex],
674        cluster.regionsPerRack[rackIndex],
675        cluster.regionIndexToPrimaryIndex);
676
677      // if there are no pairs of region replicas co-hosted, default to random generator
678      if (regionIndex == -1) {
679        // default to randompicker
680        return randomGenerator.generate(cluster);
681      }
682
683      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
684      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
685
686      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
687      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
688      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
689      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
690    }
691  }
692
693  /**
694   * Base class of StochasticLoadBalancer's Cost Functions.
695   */
696  public abstract static class CostFunction {
697
698    private float multiplier = 0;
699
700    protected Cluster cluster;
701
702    public CostFunction(Configuration c) {
703    }
704
705    boolean isNeeded() {
706      return true;
707    }
708    float getMultiplier() {
709      return multiplier;
710    }
711
712    void setMultiplier(float m) {
713      this.multiplier = m;
714    }
715
716    /** Called once per LB invocation to give the cost function
717     * to initialize it's state, and perform any costly calculation.
718     */
719    void init(Cluster cluster) {
720      this.cluster = cluster;
721    }
722
723    /** Called once per cluster Action to give the cost function
724     * an opportunity to update it's state. postAction() is always
725     * called at least once before cost() is called with the cluster
726     * that this action is performed on. */
727    void postAction(Action action) {
728      switch (action.type) {
729      case NULL: break;
730      case ASSIGN_REGION:
731        AssignRegionAction ar = (AssignRegionAction) action;
732        regionMoved(ar.region, -1, ar.server);
733        break;
734      case MOVE_REGION:
735        MoveRegionAction mra = (MoveRegionAction) action;
736        regionMoved(mra.region, mra.fromServer, mra.toServer);
737        break;
738      case SWAP_REGIONS:
739        SwapRegionsAction a = (SwapRegionsAction) action;
740        regionMoved(a.fromRegion, a.fromServer, a.toServer);
741        regionMoved(a.toRegion, a.toServer, a.fromServer);
742        break;
743      default:
744        throw new RuntimeException("Uknown action:" + action.type);
745      }
746    }
747
748    protected void regionMoved(int region, int oldServer, int newServer) {
749    }
750
751    protected abstract double cost();
752
753    @SuppressWarnings("checkstyle:linelength")
754    /**
755     * Function to compute a scaled cost using
756     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
757     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
758     * possible is all of the elements in one region server and the rest having 0.
759     *
760     * @param stats the costs
761     * @return a scaled set of costs.
762     */
763    protected double costFromArray(double[] stats) {
764      double totalCost = 0;
765      double total = getSum(stats);
766
767      double count = stats.length;
768      double mean = total/count;
769
770      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
771      // a zero sum cost for this to make sense.
772      double max = ((count - 1) * mean) + (total - mean);
773
774      // It's possible that there aren't enough regions to go around
775      double min;
776      if (count > total) {
777        min = ((count - total) * mean) + ((1 - mean) * total);
778      } else {
779        // Some will have 1 more than everything else.
780        int numHigh = (int) (total - (Math.floor(mean) * count));
781        int numLow = (int) (count - numHigh);
782
783        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
784
785      }
786      min = Math.max(0, min);
787      for (int i=0; i<stats.length; i++) {
788        double n = stats[i];
789        double diff = Math.abs(mean - n);
790        totalCost += diff;
791      }
792
793      double scaled =  scale(min, max, totalCost);
794      return scaled;
795    }
796
797    private double getSum(double[] stats) {
798      double total = 0;
799      for(double s:stats) {
800        total += s;
801      }
802      return total;
803    }
804
805    /**
806     * Scale the value between 0 and 1.
807     *
808     * @param min   Min value
809     * @param max   The Max value
810     * @param value The value to be scaled.
811     * @return The scaled value.
812     */
813    protected double scale(double min, double max, double value) {
814      if (max <= min || value <= min) {
815        return 0;
816      }
817      if ((max - min) == 0) return 0;
818
819      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
820    }
821  }
822
823  /**
824   * Given the starting state of the regions and a potential ending state
825   * compute cost based upon the number of regions that have moved.
826   */
827  static class MoveCostFunction extends CostFunction {
828    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
829    private static final String MAX_MOVES_PERCENT_KEY =
830        "hbase.master.balancer.stochastic.maxMovePercent";
831    private static final float DEFAULT_MOVE_COST = 7;
832    private static final int DEFAULT_MAX_MOVES = 600;
833    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
834
835    private final float maxMovesPercent;
836
837    MoveCostFunction(Configuration conf) {
838      super(conf);
839
840      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
841      // that large benefits are need to overcome the cost of a move.
842      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
843      // What percent of the number of regions a single run of the balancer can move.
844      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
845    }
846
847    @Override
848    protected double cost() {
849      // Try and size the max number of Moves, but always be prepared to move some.
850      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
851          DEFAULT_MAX_MOVES);
852
853      double moveCost = cluster.numMovedRegions;
854
855      // Don't let this single balance move more than the max moves.
856      // This allows better scaling to accurately represent the actual cost of a move.
857      if (moveCost > maxMoves) {
858        return 1000000;   // return a number much greater than any of the other cost
859      }
860
861      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
862    }
863  }
864
865  /**
866   * Compute the cost of a potential cluster state from skew in number of
867   * regions on a cluster.
868   */
869  static class RegionCountSkewCostFunction extends CostFunction {
870    static final String REGION_COUNT_SKEW_COST_KEY =
871        "hbase.master.balancer.stochastic.regionCountCost";
872    static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
873
874    private double[] stats = null;
875
876    RegionCountSkewCostFunction(Configuration conf) {
877      super(conf);
878      // Load multiplier should be the greatest as it is the most general way to balance data.
879      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
880    }
881
882    @Override
883    void init(Cluster cluster) {
884      super.init(cluster);
885      LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
886          cluster.numServers, cluster.numRegions);
887      if (LOG.isTraceEnabled()) {
888        for (int i =0; i < cluster.numServers; i++) {
889          LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
890              cluster.servers[i], cluster.regionsPerServer[i].length);
891        }
892      }
893    }
894
895    @Override
896    protected double cost() {
897      if (stats == null || stats.length != cluster.numServers) {
898        stats = new double[cluster.numServers];
899      }
900      for (int i =0; i < cluster.numServers; i++) {
901        stats[i] = cluster.regionsPerServer[i].length;
902      }
903      return costFromArray(stats);
904    }
905  }
906
907  /**
908   * Compute the cost of a potential cluster state from skew in number of
909   * primary regions on a cluster.
910   */
911  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
912    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
913        "hbase.master.balancer.stochastic.primaryRegionCountCost";
914    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
915
916    private double[] stats = null;
917
918    PrimaryRegionCountSkewCostFunction(Configuration conf) {
919      super(conf);
920      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
921      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
922        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
923    }
924
925    @Override
926    boolean isNeeded() {
927      return cluster.hasRegionReplicas;
928    }
929
930    @Override
931    protected double cost() {
932      if (!cluster.hasRegionReplicas) {
933        return 0;
934      }
935      if (stats == null || stats.length != cluster.numServers) {
936        stats = new double[cluster.numServers];
937      }
938
939      for (int i = 0; i < cluster.numServers; i++) {
940        stats[i] = 0;
941        for (int regionIdx : cluster.regionsPerServer[i]) {
942          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
943            stats[i]++;
944          }
945        }
946      }
947
948      return costFromArray(stats);
949    }
950  }
951
952  /**
953   * Compute the cost of a potential cluster configuration based upon how evenly
954   * distributed tables are.
955   */
956  static class TableSkewCostFunction extends CostFunction {
957
958    private static final String TABLE_SKEW_COST_KEY =
959        "hbase.master.balancer.stochastic.tableSkewCost";
960    private static final float DEFAULT_TABLE_SKEW_COST = 35;
961
962    TableSkewCostFunction(Configuration conf) {
963      super(conf);
964      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
965    }
966
967    @Override
968    protected double cost() {
969      double max = cluster.numRegions;
970      double min = ((double) cluster.numRegions) / cluster.numServers;
971      double value = 0;
972
973      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
974        value += cluster.numMaxRegionsPerTable[i];
975      }
976
977      return scale(min, max, value);
978    }
979  }
980
981  /**
982   * Compute a cost of a potential cluster configuration based upon where
983   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
984   */
985  static abstract class LocalityBasedCostFunction extends CostFunction {
986
987    private final LocalityType type;
988
989    private double bestLocality; // best case locality across cluster weighted by local data size
990    private double locality; // current locality across cluster weighted by local data size
991
992    private MasterServices services;
993
994    LocalityBasedCostFunction(Configuration conf,
995                              MasterServices srv,
996                              LocalityType type,
997                              String localityCostKey,
998                              float defaultLocalityCost) {
999      super(conf);
1000      this.type = type;
1001      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1002      this.services = srv;
1003      this.locality = 0.0;
1004      this.bestLocality = 0.0;
1005    }
1006
1007    /**
1008     * Maps region to the current entity (server or rack) on which it is stored
1009     */
1010    abstract int regionIndexToEntityIndex(int region);
1011
1012    public void setServices(MasterServices srvc) {
1013      this.services = srvc;
1014    }
1015
1016    @Override
1017    void init(Cluster cluster) {
1018      super.init(cluster);
1019      locality = 0.0;
1020      bestLocality = 0.0;
1021
1022      // If no master, no computation will work, so assume 0 cost
1023      if (this.services == null) {
1024        return;
1025      }
1026
1027      for (int region = 0; region < cluster.numRegions; region++) {
1028        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1029        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1030      }
1031
1032      // We normalize locality to be a score between 0 and 1.0 representing how good it
1033      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1034      // (and the cost is 0)
1035      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1036    }
1037
1038    @Override
1039    protected void regionMoved(int region, int oldServer, int newServer) {
1040      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1041      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1042      if (this.services == null) {
1043        return;
1044      }
1045      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1046      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1047      locality += normalizedDelta;
1048    }
1049
1050    @Override
1051    protected double cost() {
1052      return 1 - locality;
1053    }
1054
1055    private int getMostLocalEntityForRegion(int region) {
1056      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1057    }
1058
1059    private double getWeightedLocality(int region, int entity) {
1060      return cluster.getOrComputeWeightedLocality(region, entity, type);
1061    }
1062
1063  }
1064
1065  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1066
1067    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1068    private static final float DEFAULT_LOCALITY_COST = 25;
1069
1070    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1071      super(
1072          conf,
1073          srv,
1074          LocalityType.SERVER,
1075          LOCALITY_COST_KEY,
1076          DEFAULT_LOCALITY_COST
1077      );
1078    }
1079
1080    @Override
1081    int regionIndexToEntityIndex(int region) {
1082      return cluster.regionIndexToServerIndex[region];
1083    }
1084  }
1085
1086  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1087
1088    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1089    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1090
1091    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1092      super(
1093          conf,
1094          services,
1095          LocalityType.RACK,
1096          RACK_LOCALITY_COST_KEY,
1097          DEFAULT_RACK_LOCALITY_COST
1098      );
1099    }
1100
1101    @Override
1102    int regionIndexToEntityIndex(int region) {
1103      return cluster.getRackForRegion(region);
1104    }
1105  }
1106
1107  /**
1108   * Base class the allows writing costs functions from rolling average of some
1109   * number from RegionLoad.
1110   */
1111  abstract static class CostFromRegionLoadFunction extends CostFunction {
1112
1113    private ClusterMetrics clusterStatus = null;
1114    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1115    private double[] stats = null;
1116    CostFromRegionLoadFunction(Configuration conf) {
1117      super(conf);
1118    }
1119
1120    void setClusterMetrics(ClusterMetrics status) {
1121      this.clusterStatus = status;
1122    }
1123
1124    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1125      this.loads = l;
1126    }
1127
1128    @Override
1129    protected double cost() {
1130      if (clusterStatus == null || loads == null) {
1131        return 0;
1132      }
1133
1134      if (stats == null || stats.length != cluster.numServers) {
1135        stats = new double[cluster.numServers];
1136      }
1137
1138      for (int i =0; i < stats.length; i++) {
1139        //Cost this server has from RegionLoad
1140        long cost = 0;
1141
1142        // for every region on this server get the rl
1143        for(int regionIndex:cluster.regionsPerServer[i]) {
1144          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1145
1146          // Now if we found a region load get the type of cost that was requested.
1147          if (regionLoadList != null) {
1148            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1149          }
1150        }
1151
1152        // Add the total cost to the stats.
1153        stats[i] = cost;
1154      }
1155
1156      // Now return the scaled cost from data held in the stats object.
1157      return costFromArray(stats);
1158    }
1159
1160    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1161      double cost = 0;
1162      for (BalancerRegionLoad rl : regionLoadList) {
1163        cost += getCostFromRl(rl);
1164      }
1165      return cost / regionLoadList.size();
1166    }
1167
1168    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1169  }
1170
1171  /**
1172   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1173   * We do not compare about the actual rate in requests per second but rather the rate relative
1174   * to the rest of the regions.
1175   */
1176  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1177
1178    CostFromRegionLoadAsRateFunction(Configuration conf) {
1179      super(conf);
1180    }
1181
1182    @Override
1183    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1184      double cost = 0;
1185      double previous = 0;
1186      boolean isFirst = true;
1187      for (BalancerRegionLoad rl : regionLoadList) {
1188        double current = getCostFromRl(rl);
1189        if (isFirst) {
1190          isFirst = false;
1191        } else {
1192          cost += current - previous;
1193        }
1194        previous = current;
1195      }
1196      return Math.max(0, cost / (regionLoadList.size() - 1));
1197    }
1198  }
1199
1200  /**
1201   * Compute the cost of total number of read requests  The more unbalanced the higher the
1202   * computed cost will be.  This uses a rolling average of regionload.
1203   */
1204
1205  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1206
1207    private static final String READ_REQUEST_COST_KEY =
1208        "hbase.master.balancer.stochastic.readRequestCost";
1209    private static final float DEFAULT_READ_REQUEST_COST = 5;
1210
1211    ReadRequestCostFunction(Configuration conf) {
1212      super(conf);
1213      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1214    }
1215
1216    @Override
1217    protected double getCostFromRl(BalancerRegionLoad rl) {
1218      return rl.getReadRequestsCount();
1219    }
1220  }
1221
1222  /**
1223   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1224   * computed cost will be.  This uses a rolling average of regionload.
1225   */
1226  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1227
1228    private static final String WRITE_REQUEST_COST_KEY =
1229        "hbase.master.balancer.stochastic.writeRequestCost";
1230    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1231
1232    WriteRequestCostFunction(Configuration conf) {
1233      super(conf);
1234      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1235    }
1236
1237    @Override
1238    protected double getCostFromRl(BalancerRegionLoad rl) {
1239      return rl.getWriteRequestsCount();
1240    }
1241  }
1242
1243  /**
1244   * A cost function for region replicas. We give a very high cost to hosting
1245   * replicas of the same region in the same host. We do not prevent the case
1246   * though, since if numReplicas > numRegionServers, we still want to keep the
1247   * replica open.
1248   */
1249  static class RegionReplicaHostCostFunction extends CostFunction {
1250    private static final String REGION_REPLICA_HOST_COST_KEY =
1251        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1252    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1253
1254    long maxCost = 0;
1255    long[] costsPerGroup; // group is either server, host or rack
1256    int[][] primariesOfRegionsPerGroup;
1257
1258    public RegionReplicaHostCostFunction(Configuration conf) {
1259      super(conf);
1260      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1261        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1262    }
1263
1264    @Override
1265    void init(Cluster cluster) {
1266      super.init(cluster);
1267      // max cost is the case where every region replica is hosted together regardless of host
1268      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1269      costsPerGroup = new long[cluster.numHosts];
1270      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1271          ? cluster.primariesOfRegionsPerHost
1272          : cluster.primariesOfRegionsPerServer;
1273      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1274        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1275      }
1276    }
1277
1278    long getMaxCost(Cluster cluster) {
1279      if (!cluster.hasRegionReplicas) {
1280        return 0; // short circuit
1281      }
1282      // max cost is the case where every region replica is hosted together regardless of host
1283      int[] primariesOfRegions = new int[cluster.numRegions];
1284      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1285          cluster.regions.length);
1286
1287      Arrays.sort(primariesOfRegions);
1288
1289      // compute numReplicas from the sorted array
1290      return costPerGroup(primariesOfRegions);
1291    }
1292
1293    @Override
1294    boolean isNeeded() {
1295      return cluster.hasRegionReplicas;
1296    }
1297
1298    @Override
1299    protected double cost() {
1300      if (maxCost <= 0) {
1301        return 0;
1302      }
1303
1304      long totalCost = 0;
1305      for (int i = 0 ; i < costsPerGroup.length; i++) {
1306        totalCost += costsPerGroup[i];
1307      }
1308      return scale(0, maxCost, totalCost);
1309    }
1310
1311    /**
1312     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1313     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1314     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1315     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1316     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1317     * @return a sum of numReplicas-1 squared for each primary region in the group.
1318     */
1319    protected long costPerGroup(int[] primariesOfRegions) {
1320      long cost = 0;
1321      int currentPrimary = -1;
1322      int currentPrimaryIndex = -1;
1323      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1324      // sharing the same primary will have consecutive numbers in the array.
1325      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1326        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1327        if (primary != currentPrimary) { // we see a new primary
1328          int numReplicas = j - currentPrimaryIndex;
1329          // square the cost
1330          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1331            cost += (numReplicas - 1) * (numReplicas - 1);
1332          }
1333          currentPrimary = primary;
1334          currentPrimaryIndex = j;
1335        }
1336      }
1337
1338      return cost;
1339    }
1340
1341    @Override
1342    protected void regionMoved(int region, int oldServer, int newServer) {
1343      if (maxCost <= 0) {
1344        return; // no need to compute
1345      }
1346      if (cluster.multiServersPerHost) {
1347        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1348        int newHost = cluster.serverIndexToHostIndex[newServer];
1349        if (newHost != oldHost) {
1350          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1351          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1352        }
1353      } else {
1354        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1355        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1356      }
1357    }
1358  }
1359
1360  /**
1361   * A cost function for region replicas for the rack distribution. We give a relatively high
1362   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1363   * though.
1364   */
1365  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1366    private static final String REGION_REPLICA_RACK_COST_KEY =
1367        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1368    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1369
1370    public RegionReplicaRackCostFunction(Configuration conf) {
1371      super(conf);
1372      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1373        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1374    }
1375
1376    @Override
1377    void init(Cluster cluster) {
1378      this.cluster = cluster;
1379      if (cluster.numRacks <= 1) {
1380        maxCost = 0;
1381        return; // disabled for 1 rack
1382      }
1383      // max cost is the case where every region replica is hosted together regardless of rack
1384      maxCost = getMaxCost(cluster);
1385      costsPerGroup = new long[cluster.numRacks];
1386      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1387        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1388      }
1389    }
1390
1391    @Override
1392    protected void regionMoved(int region, int oldServer, int newServer) {
1393      if (maxCost <= 0) {
1394        return; // no need to compute
1395      }
1396      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1397      int newRack = cluster.serverIndexToRackIndex[newServer];
1398      if (newRack != oldRack) {
1399        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1400        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1401      }
1402    }
1403  }
1404
1405  /**
1406   * Compute the cost of total memstore size.  The more unbalanced the higher the
1407   * computed cost will be.  This uses a rolling average of regionload.
1408   */
1409  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1410
1411    private static final String MEMSTORE_SIZE_COST_KEY =
1412        "hbase.master.balancer.stochastic.memstoreSizeCost";
1413    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1414
1415    MemStoreSizeCostFunction(Configuration conf) {
1416      super(conf);
1417      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1418    }
1419
1420    @Override
1421    protected double getCostFromRl(BalancerRegionLoad rl) {
1422      return rl.getMemStoreSizeMB();
1423    }
1424  }
1425  /**
1426   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1427   * computed cost will be.  This uses a rolling average of regionload.
1428   */
1429  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1430
1431    private static final String STOREFILE_SIZE_COST_KEY =
1432        "hbase.master.balancer.stochastic.storefileSizeCost";
1433    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1434
1435    StoreFileCostFunction(Configuration conf) {
1436      super(conf);
1437      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1438    }
1439
1440    @Override
1441    protected double getCostFromRl(BalancerRegionLoad rl) {
1442      return rl.getStorefileSizeMB();
1443    }
1444  }
1445
1446  /**
1447   * A helper function to compose the attribute name from tablename and costfunction name
1448   */
1449  public static String composeAttributeName(String tableName, String costFunctionName) {
1450    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1451  }
1452}