001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Deque;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Random;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.ReflectionUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058
059/**
060 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
061 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 *
071 *
072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
073 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
074 * scaled by their respective multipliers:</p>
075 *
076 * <ul>
077 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
078 *   <li>hbase.master.balancer.stochastic.moveCost</li>
079 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.localityCost</li>
081 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
082 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
083 * </ul>
084 *
085 * <p>You can also add custom Cost function by setting the the following configuration value:</p>
086 * <ul>
087 *     <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
088 * </ul>
089 *
090 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
091 *
092 * <p>In addition to the above configurations, the balancer can be tuned by the following
093 * configuration values:</p>
094 * <ul>
095 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
096 *   controls what the max number of regions that can be moved in a single invocation of this
097 *   balancer.</li>
098 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
099 *   regions is multiplied to try and get the number of times the balancer will
100 *   mutate all servers.</li>
101 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
102 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
103 *   value and the above computation.</li>
104 * </ul>
105 *
106 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
107 * so that the balancer gets the full picture of all loads on the cluster.</p>
108 */
109@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
110@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
111  justification="Complaint is about costFunctions not being synchronized; not end of the world")
112public class StochasticLoadBalancer extends BaseLoadBalancer {
113
114  protected static final String STEPS_PER_REGION_KEY =
115      "hbase.master.balancer.stochastic.stepsPerRegion";
116  protected static final String MAX_STEPS_KEY =
117      "hbase.master.balancer.stochastic.maxSteps";
118  protected static final String RUN_MAX_STEPS_KEY =
119      "hbase.master.balancer.stochastic.runMaxSteps";
120  protected static final String MAX_RUNNING_TIME_KEY =
121      "hbase.master.balancer.stochastic.maxRunningTime";
122  protected static final String KEEP_REGION_LOADS =
123      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
124  private static final String TABLE_FUNCTION_SEP = "_";
125  protected static final String MIN_COST_NEED_BALANCE_KEY =
126      "hbase.master.balancer.stochastic.minCostNeedBalance";
127  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
128          "hbase.master.balancer.stochastic.additionalCostFunctions";
129
130  protected static final Random RANDOM = new Random(System.currentTimeMillis());
131  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
132
133  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
134
135  // values are defaults
136  private int maxSteps = 1000000;
137  private boolean runMaxSteps = false;
138  private int stepsPerRegion = 800;
139  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
140  private int numRegionLoadsToRemember = 15;
141  private float minCostNeedBalance = 0.05f;
142
143  private List<CandidateGenerator> candidateGenerators;
144  private CostFromRegionLoadFunction[] regionLoadFunctions;
145  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
146
147  // to save and report costs to JMX
148  private Double curOverallCost = 0d;
149  private Double[] tempFunctionCosts;
150  private Double[] curFunctionCosts;
151
152  // Keep locality based picker and cost function to alert them
153  // when new services are offered
154  private LocalityBasedCandidateGenerator localityCandidateGenerator;
155  private ServerLocalityCostFunction localityCost;
156  private RackLocalityCostFunction rackLocalityCost;
157  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
158  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
159
160  /**
161   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
162   * default MetricsBalancer
163   */
164  public StochasticLoadBalancer() {
165    super(new MetricsStochasticBalancer());
166  }
167
168  @Override
169  public void onConfigurationChange(Configuration conf) {
170    setConf(conf);
171  }
172
173  @Override
174  public synchronized void setConf(Configuration conf) {
175    super.setConf(conf);
176    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
177    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
178    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
179    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
180
181    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
182    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
183    if (localityCandidateGenerator == null) {
184      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
185    }
186    localityCost = new ServerLocalityCostFunction(conf, services);
187    rackLocalityCost = new RackLocalityCostFunction(conf, services);
188
189    if (this.candidateGenerators == null) {
190      candidateGenerators = Lists.newArrayList();
191      candidateGenerators.add(new RandomCandidateGenerator());
192      candidateGenerators.add(new LoadCandidateGenerator());
193      candidateGenerators.add(localityCandidateGenerator);
194      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
195    }
196    regionLoadFunctions = new CostFromRegionLoadFunction[] {
197      new ReadRequestCostFunction(conf),
198      new CPRequestCostFunction(conf),
199      new WriteRequestCostFunction(conf),
200      new MemStoreSizeCostFunction(conf),
201      new StoreFileCostFunction(conf)
202    };
203    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
204    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
205
206    costFunctions = new ArrayList<>();
207    costFunctions.add(new RegionCountSkewCostFunction(conf));
208    costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
209    costFunctions.add(new MoveCostFunction(conf));
210    costFunctions.add(localityCost);
211    costFunctions.add(rackLocalityCost);
212    costFunctions.add(new TableSkewCostFunction(conf));
213    costFunctions.add(regionReplicaHostCostFunction);
214    costFunctions.add(regionReplicaRackCostFunction);
215    costFunctions.add(regionLoadFunctions[0]);
216    costFunctions.add(regionLoadFunctions[1]);
217    costFunctions.add(regionLoadFunctions[2]);
218    costFunctions.add(regionLoadFunctions[3]);
219    costFunctions.add(regionLoadFunctions[4]);
220    loadCustomCostFunctions(conf);
221
222    curFunctionCosts= new Double[costFunctions.size()];
223    tempFunctionCosts= new Double[costFunctions.size()];
224    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
225            ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
226            Arrays.toString(getCostFunctionNames()) + " etc.");
227  }
228
229  private void loadCustomCostFunctions(Configuration conf) {
230    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
231
232    if (null == functionsNames) {
233      return;
234    }
235
236    costFunctions.addAll(Arrays.stream(functionsNames)
237            .map(c -> {
238              Class<? extends CostFunction> klass = null;
239              try {
240                klass = (Class<? extends CostFunction>) Class.forName(c);
241              } catch (ClassNotFoundException e) {
242                LOG.warn("Cannot load class " + c + "': " + e.getMessage());
243              }
244              if (null == klass) {
245                return null;
246              }
247
248              CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
249              LOG.info("Successfully loaded custom CostFunction '" +
250                      reflected.getClass().getSimpleName() + "'");
251
252              return reflected;
253            })
254            .filter(Objects::nonNull)
255            .collect(Collectors.toList()));
256  }
257
258  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
259    this.candidateGenerators = customCandidateGenerators;
260  }
261
262  @Override
263  protected void setSlop(Configuration conf) {
264    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
265  }
266
267  @Override
268  public synchronized void setClusterMetrics(ClusterMetrics st) {
269    super.setClusterMetrics(st);
270    updateRegionLoad();
271    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
272      cost.setClusterMetrics(st);
273    }
274
275    // update metrics size
276    try {
277      // by-table or ensemble mode
278      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
279      int functionsCount = getCostFunctionNames().length;
280
281      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
282    } catch (Exception e) {
283      LOG.error("failed to get the size of all tables", e);
284    }
285  }
286
287  /**
288   * Update the number of metrics that are reported to JMX
289   */
290  public void updateMetricsSize(int size) {
291    if (metricsBalancer instanceof MetricsStochasticBalancer) {
292        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
293    }
294  }
295
296  @Override
297  public synchronized void setMasterServices(MasterServices masterServices) {
298    super.setMasterServices(masterServices);
299    this.localityCost.setServices(masterServices);
300    this.rackLocalityCost.setServices(masterServices);
301    this.localityCandidateGenerator.setServices(masterServices);
302  }
303
304  @Override
305  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
306    regionReplicaHostCostFunction.init(c);
307    if (regionReplicaHostCostFunction.cost() > 0) return true;
308    regionReplicaRackCostFunction.init(c);
309    if (regionReplicaRackCostFunction.cost() > 0) return true;
310    return false;
311  }
312
313  @Override
314  protected boolean needsBalance(TableName tableName, Cluster cluster) {
315    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
316    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
317      if (LOG.isDebugEnabled()) {
318        LOG.debug("Not running balancer because only " + cs.getNumServers()
319            + " active regionserver(s)");
320      }
321      return false;
322    }
323    if (areSomeRegionReplicasColocated(cluster)) {
324      return true;
325    }
326
327    if (idleRegionServerExist(cluster)){
328      return true;
329    }
330
331    double total = 0.0;
332    float sumMultiplier = 0.0f;
333    for (CostFunction c : costFunctions) {
334      float multiplier = c.getMultiplier();
335      if (multiplier <= 0) {
336        LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
337        continue;
338      }
339      if (!c.isNeeded()) {
340        LOG.trace("{} not needed", c.getClass().getSimpleName());
341        continue;
342      }
343      sumMultiplier += multiplier;
344      total += c.cost() * multiplier;
345    }
346
347    boolean balanced = total <= 0 || sumMultiplier <= 0 ||
348        (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
349    if (LOG.isDebugEnabled()) {
350      LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
351          balanced ? "Skipping load balancing because balanced" : "We need to load balance",
352          isByTable ? String.format("table (%s)", tableName) : "cluster",
353          total, sumMultiplier, minCostNeedBalance);
354      if (LOG.isTraceEnabled()) {
355        LOG.trace("Balance decision detailed function costs={}", functionCost());
356      }
357    }
358    return !balanced;
359  }
360
361  @VisibleForTesting
362  Cluster.Action nextAction(Cluster cluster) {
363    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
364            .generate(cluster);
365  }
366
367  /**
368   * Given the cluster state this will try and approach an optimal balance. This
369   * should always approach the optimal state given enough steps.
370   */
371  @Override
372  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
373    List<RegionInfo>> loadOfOneTable) {
374    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
375    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
376      return plans;
377    }
378
379    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
380      if (loadOfOneTable.size() <= 2) {
381        return null;
382      }
383      loadOfOneTable = new HashMap<>(loadOfOneTable);
384      loadOfOneTable.remove(masterServerName);
385    }
386
387    // On clusters with lots of HFileLinks or lots of reference files,
388    // instantiating the storefile infos can be quite expensive.
389    // Allow turning this feature off if the locality cost is not going to
390    // be used in any computations.
391    RegionLocationFinder finder = null;
392    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
393        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
394      finder = this.regionFinder;
395    }
396
397    //The clusterState that is given to this method contains the state
398    //of all the regions in the table(s) (that's true today)
399    // Keep track of servers to iterate through them.
400    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
401
402    long startTime = EnvironmentEdgeManager.currentTime();
403
404    initCosts(cluster);
405
406    if (!needsBalance(tableName, cluster)) {
407      return null;
408    }
409
410    double currentCost = computeCost(cluster, Double.MAX_VALUE);
411    curOverallCost = currentCost;
412    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
413    double initCost = currentCost;
414    double newCost = currentCost;
415
416    long computedMaxSteps;
417    if (runMaxSteps) {
418      computedMaxSteps = Math.max(this.maxSteps,
419          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
420    } else {
421      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
422          (long)cluster.numServers;
423      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
424      if (calculatedMaxSteps > maxSteps) {
425        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
426            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
427            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
428            + "(This config change does not require service restart)", calculatedMaxSteps,
429            maxSteps);
430      }
431    }
432    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
433        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
434
435    // Perform a stochastic walk to see if we can get a good fit.
436    long step;
437
438    for (step = 0; step < computedMaxSteps; step++) {
439      Cluster.Action action = nextAction(cluster);
440
441      if (action.type == Type.NULL) {
442        continue;
443      }
444
445      cluster.doAction(action);
446      updateCostsWithAction(cluster, action);
447
448      newCost = computeCost(cluster, currentCost);
449
450      // Should this be kept?
451      if (newCost < currentCost) {
452        currentCost = newCost;
453
454        // save for JMX
455        curOverallCost = currentCost;
456        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
457      } else {
458        // Put things back the way they were before.
459        // TODO: undo by remembering old values
460        Action undoAction = action.undoAction();
461        cluster.doAction(undoAction);
462        updateCostsWithAction(cluster, undoAction);
463      }
464
465      if (EnvironmentEdgeManager.currentTime() - startTime >
466          maxRunningTime) {
467        break;
468      }
469    }
470    long endTime = EnvironmentEdgeManager.currentTime();
471
472    metricsBalancer.balanceCluster(endTime - startTime);
473
474    // update costs metrics
475    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
476    if (initCost > currentCost) {
477      plans = createRegionPlans(cluster);
478      LOG.info("Finished computing new load balance plan. Computation took {}" +
479        " to try {} different iterations.  Found a solution that moves " +
480        "{} regions; Going from a computed cost of {}" +
481        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
482        step, plans.size(), initCost, currentCost);
483      return plans;
484    }
485    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
486      "{}, and did not find anything with a computed cost less than {}", step,
487      java.time.Duration.ofMillis(endTime - startTime), initCost);
488    return null;
489  }
490
491  /**
492   * update costs to JMX
493   */
494  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
495    if (tableName == null) return;
496
497    // check if the metricsBalancer is MetricsStochasticBalancer before casting
498    if (metricsBalancer instanceof MetricsStochasticBalancer) {
499      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
500      // overall cost
501      balancer.updateStochasticCost(tableName.getNameAsString(),
502        "Overall", "Overall cost", overall);
503
504      // each cost function
505      for (int i = 0; i < costFunctions.size(); i++) {
506        CostFunction costFunction = costFunctions.get(i);
507        String costFunctionName = costFunction.getClass().getSimpleName();
508        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
509        // TODO: cost function may need a specific description
510        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
511          "The percent of " + costFunctionName, costPercent);
512      }
513    }
514  }
515
516  private String functionCost() {
517    StringBuilder builder = new StringBuilder();
518    for (CostFunction c:costFunctions) {
519      builder.append(c.getClass().getSimpleName());
520      builder.append(" : (");
521      builder.append(c.getMultiplier());
522      builder.append(", ");
523      builder.append(c.cost());
524      builder.append("); ");
525    }
526    return builder.toString();
527  }
528
529  /**
530   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
531   * state.
532   *
533   * @param cluster The state of the cluster
534   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
535   */
536  private List<RegionPlan> createRegionPlans(Cluster cluster) {
537    List<RegionPlan> plans = new LinkedList<>();
538    for (int regionIndex = 0;
539         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
540      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
541      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
542
543      if (initialServerIndex != newServerIndex) {
544        RegionInfo region = cluster.regions[regionIndex];
545        ServerName initialServer = cluster.servers[initialServerIndex];
546        ServerName newServer = cluster.servers[newServerIndex];
547
548        if (LOG.isTraceEnabled()) {
549          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
550              + initialServer.getHostname() + " to " + newServer.getHostname());
551        }
552        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
553        plans.add(rp);
554      }
555    }
556    return plans;
557  }
558
559  /**
560   * Store the current region loads.
561   */
562  private synchronized void updateRegionLoad() {
563    // We create a new hashmap so that regions that are no longer there are removed.
564    // However we temporarily need the old loads so we can use them to keep the rolling average.
565    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
566    loads = new HashMap<>();
567
568    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
569      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
570        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
571        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
572        if (rLoads == null) {
573          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
574        } else if (rLoads.size() >= numRegionLoadsToRemember) {
575          rLoads.remove();
576        }
577        rLoads.add(new BalancerRegionLoad(rm));
578        loads.put(regionNameAsString, rLoads);
579      });
580    });
581
582    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
583      cost.setLoads(loads);
584    }
585  }
586
587  protected void initCosts(Cluster cluster) {
588    for (CostFunction c:costFunctions) {
589      c.init(cluster);
590    }
591  }
592
593  protected void updateCostsWithAction(Cluster cluster, Action action) {
594    for (CostFunction c : costFunctions) {
595      c.postAction(action);
596    }
597  }
598
599  /**
600   * Get the names of the cost functions
601   */
602  public String[] getCostFunctionNames() {
603    if (costFunctions == null) return null;
604    String[] ret = new String[costFunctions.size()];
605    for (int i = 0; i < costFunctions.size(); i++) {
606      CostFunction c = costFunctions.get(i);
607      ret[i] = c.getClass().getSimpleName();
608    }
609
610    return ret;
611  }
612
613  /**
614   * This is the main cost function.  It will compute a cost associated with a proposed cluster
615   * state.  All different costs will be combined with their multipliers to produce a double cost.
616   *
617   * @param cluster The state of the cluster
618   * @param previousCost the previous cost. This is used as an early out.
619   * @return a double of a cost associated with the proposed cluster state.  This cost is an
620   *         aggregate of all individual cost functions.
621   */
622  protected double computeCost(Cluster cluster, double previousCost) {
623    double total = 0;
624
625    for (int i = 0; i < costFunctions.size(); i++) {
626      CostFunction c = costFunctions.get(i);
627      this.tempFunctionCosts[i] = 0.0;
628
629      if (c.getMultiplier() <= 0) {
630        continue;
631      }
632
633      Float multiplier = c.getMultiplier();
634      Double cost = c.cost();
635
636      this.tempFunctionCosts[i] = multiplier*cost;
637      total += this.tempFunctionCosts[i];
638
639      if (total > previousCost) {
640        break;
641      }
642    }
643
644    return total;
645  }
646
647  static class RandomCandidateGenerator extends CandidateGenerator {
648
649    @Override
650    Cluster.Action generate(Cluster cluster) {
651
652      int thisServer = pickRandomServer(cluster);
653
654      // Pick the other server
655      int otherServer = pickOtherRandomServer(cluster, thisServer);
656
657      return pickRandomRegions(cluster, thisServer, otherServer);
658    }
659  }
660
661  /**
662   * Generates candidates which moves the replicas out of the rack for
663   * co-hosted region replicas in the same rack
664   */
665  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
666    @Override
667    Cluster.Action generate(Cluster cluster) {
668      int rackIndex = pickRandomRack(cluster);
669      if (cluster.numRacks <= 1 || rackIndex == -1) {
670        return super.generate(cluster);
671      }
672
673      int regionIndex = selectCoHostedRegionPerGroup(
674        cluster.primariesOfRegionsPerRack[rackIndex],
675        cluster.regionsPerRack[rackIndex],
676        cluster.regionIndexToPrimaryIndex);
677
678      // if there are no pairs of region replicas co-hosted, default to random generator
679      if (regionIndex == -1) {
680        // default to randompicker
681        return randomGenerator.generate(cluster);
682      }
683
684      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
685      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
686
687      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
688      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
689      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
690      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
691    }
692  }
693
694  /**
695   * Base class of StochasticLoadBalancer's Cost Functions.
696   */
697  public abstract static class CostFunction {
698
699    private float multiplier = 0;
700
701    protected Cluster cluster;
702
703    public CostFunction(Configuration c) {
704    }
705
706    boolean isNeeded() {
707      return true;
708    }
709    float getMultiplier() {
710      return multiplier;
711    }
712
713    void setMultiplier(float m) {
714      this.multiplier = m;
715    }
716
717    /** Called once per LB invocation to give the cost function
718     * to initialize it's state, and perform any costly calculation.
719     */
720    void init(Cluster cluster) {
721      this.cluster = cluster;
722    }
723
724    /** Called once per cluster Action to give the cost function
725     * an opportunity to update it's state. postAction() is always
726     * called at least once before cost() is called with the cluster
727     * that this action is performed on. */
728    void postAction(Action action) {
729      switch (action.type) {
730      case NULL: break;
731      case ASSIGN_REGION:
732        AssignRegionAction ar = (AssignRegionAction) action;
733        regionMoved(ar.region, -1, ar.server);
734        break;
735      case MOVE_REGION:
736        MoveRegionAction mra = (MoveRegionAction) action;
737        regionMoved(mra.region, mra.fromServer, mra.toServer);
738        break;
739      case SWAP_REGIONS:
740        SwapRegionsAction a = (SwapRegionsAction) action;
741        regionMoved(a.fromRegion, a.fromServer, a.toServer);
742        regionMoved(a.toRegion, a.toServer, a.fromServer);
743        break;
744      default:
745        throw new RuntimeException("Uknown action:" + action.type);
746      }
747    }
748
749    protected void regionMoved(int region, int oldServer, int newServer) {
750    }
751
752    protected abstract double cost();
753
754    @SuppressWarnings("checkstyle:linelength")
755    /**
756     * Function to compute a scaled cost using
757     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
758     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
759     * possible is all of the elements in one region server and the rest having 0.
760     *
761     * @param stats the costs
762     * @return a scaled set of costs.
763     */
764    protected double costFromArray(double[] stats) {
765      double totalCost = 0;
766      double total = getSum(stats);
767
768      double count = stats.length;
769      double mean = total/count;
770
771      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
772      // a zero sum cost for this to make sense.
773      double max = ((count - 1) * mean) + (total - mean);
774
775      // It's possible that there aren't enough regions to go around
776      double min;
777      if (count > total) {
778        min = ((count - total) * mean) + ((1 - mean) * total);
779      } else {
780        // Some will have 1 more than everything else.
781        int numHigh = (int) (total - (Math.floor(mean) * count));
782        int numLow = (int) (count - numHigh);
783
784        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
785
786      }
787      min = Math.max(0, min);
788      for (int i=0; i<stats.length; i++) {
789        double n = stats[i];
790        double diff = Math.abs(mean - n);
791        totalCost += diff;
792      }
793
794      double scaled =  scale(min, max, totalCost);
795      return scaled;
796    }
797
798    private double getSum(double[] stats) {
799      double total = 0;
800      for(double s:stats) {
801        total += s;
802      }
803      return total;
804    }
805
806    /**
807     * Scale the value between 0 and 1.
808     *
809     * @param min   Min value
810     * @param max   The Max value
811     * @param value The value to be scaled.
812     * @return The scaled value.
813     */
814    protected double scale(double min, double max, double value) {
815      if (max <= min || value <= min) {
816        return 0;
817      }
818      if ((max - min) == 0) return 0;
819
820      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
821    }
822  }
823
824  /**
825   * Given the starting state of the regions and a potential ending state
826   * compute cost based upon the number of regions that have moved.
827   */
828  static class MoveCostFunction extends CostFunction {
829    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
830    private static final String MAX_MOVES_PERCENT_KEY =
831        "hbase.master.balancer.stochastic.maxMovePercent";
832    private static final float DEFAULT_MOVE_COST = 7;
833    private static final int DEFAULT_MAX_MOVES = 600;
834    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
835
836    private final float maxMovesPercent;
837
838    MoveCostFunction(Configuration conf) {
839      super(conf);
840
841      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
842      // that large benefits are need to overcome the cost of a move.
843      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
844      // What percent of the number of regions a single run of the balancer can move.
845      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
846    }
847
848    @Override
849    protected double cost() {
850      // Try and size the max number of Moves, but always be prepared to move some.
851      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
852          DEFAULT_MAX_MOVES);
853
854      double moveCost = cluster.numMovedRegions;
855
856      // Don't let this single balance move more than the max moves.
857      // This allows better scaling to accurately represent the actual cost of a move.
858      if (moveCost > maxMoves) {
859        return 1000000;   // return a number much greater than any of the other cost
860      }
861
862      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
863    }
864  }
865
866  /**
867   * Compute the cost of a potential cluster state from skew in number of
868   * regions on a cluster.
869   */
870  static class RegionCountSkewCostFunction extends CostFunction {
871    static final String REGION_COUNT_SKEW_COST_KEY =
872        "hbase.master.balancer.stochastic.regionCountCost";
873    static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
874
875    private double[] stats = null;
876
877    RegionCountSkewCostFunction(Configuration conf) {
878      super(conf);
879      // Load multiplier should be the greatest as it is the most general way to balance data.
880      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
881    }
882
883    @Override
884    void init(Cluster cluster) {
885      super.init(cluster);
886      LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
887          cluster.numServers, cluster.numRegions);
888      if (LOG.isTraceEnabled()) {
889        for (int i =0; i < cluster.numServers; i++) {
890          LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
891              cluster.servers[i], cluster.regionsPerServer[i].length);
892        }
893      }
894    }
895
896    @Override
897    protected double cost() {
898      if (stats == null || stats.length != cluster.numServers) {
899        stats = new double[cluster.numServers];
900      }
901      for (int i =0; i < cluster.numServers; i++) {
902        stats[i] = cluster.regionsPerServer[i].length;
903      }
904      return costFromArray(stats);
905    }
906  }
907
908  /**
909   * Compute the cost of a potential cluster state from skew in number of
910   * primary regions on a cluster.
911   */
912  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
913    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
914        "hbase.master.balancer.stochastic.primaryRegionCountCost";
915    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
916
917    private double[] stats = null;
918
919    PrimaryRegionCountSkewCostFunction(Configuration conf) {
920      super(conf);
921      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
922      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
923        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
924    }
925
926    @Override
927    boolean isNeeded() {
928      return cluster.hasRegionReplicas;
929    }
930
931    @Override
932    protected double cost() {
933      if (!cluster.hasRegionReplicas) {
934        return 0;
935      }
936      if (stats == null || stats.length != cluster.numServers) {
937        stats = new double[cluster.numServers];
938      }
939
940      for (int i = 0; i < cluster.numServers; i++) {
941        stats[i] = 0;
942        for (int regionIdx : cluster.regionsPerServer[i]) {
943          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
944            stats[i]++;
945          }
946        }
947      }
948
949      return costFromArray(stats);
950    }
951  }
952
953  /**
954   * Compute the cost of a potential cluster configuration based upon how evenly
955   * distributed tables are.
956   */
957  static class TableSkewCostFunction extends CostFunction {
958
959    private static final String TABLE_SKEW_COST_KEY =
960        "hbase.master.balancer.stochastic.tableSkewCost";
961    private static final float DEFAULT_TABLE_SKEW_COST = 35;
962
963    TableSkewCostFunction(Configuration conf) {
964      super(conf);
965      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
966    }
967
968    @Override
969    protected double cost() {
970      double max = cluster.numRegions;
971      double min = ((double) cluster.numRegions) / cluster.numServers;
972      double value = 0;
973
974      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
975        value += cluster.numMaxRegionsPerTable[i];
976      }
977
978      return scale(min, max, value);
979    }
980  }
981
982  /**
983   * Compute a cost of a potential cluster configuration based upon where
984   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
985   */
986  static abstract class LocalityBasedCostFunction extends CostFunction {
987
988    private final LocalityType type;
989
990    private double bestLocality; // best case locality across cluster weighted by local data size
991    private double locality; // current locality across cluster weighted by local data size
992
993    private MasterServices services;
994
995    LocalityBasedCostFunction(Configuration conf,
996                              MasterServices srv,
997                              LocalityType type,
998                              String localityCostKey,
999                              float defaultLocalityCost) {
1000      super(conf);
1001      this.type = type;
1002      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1003      this.services = srv;
1004      this.locality = 0.0;
1005      this.bestLocality = 0.0;
1006    }
1007
1008    /**
1009     * Maps region to the current entity (server or rack) on which it is stored
1010     */
1011    abstract int regionIndexToEntityIndex(int region);
1012
1013    public void setServices(MasterServices srvc) {
1014      this.services = srvc;
1015    }
1016
1017    @Override
1018    void init(Cluster cluster) {
1019      super.init(cluster);
1020      locality = 0.0;
1021      bestLocality = 0.0;
1022
1023      // If no master, no computation will work, so assume 0 cost
1024      if (this.services == null) {
1025        return;
1026      }
1027
1028      for (int region = 0; region < cluster.numRegions; region++) {
1029        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1030        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1031      }
1032
1033      // We normalize locality to be a score between 0 and 1.0 representing how good it
1034      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1035      // (and the cost is 0)
1036      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1037    }
1038
1039    @Override
1040    protected void regionMoved(int region, int oldServer, int newServer) {
1041      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1042      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1043      if (this.services == null) {
1044        return;
1045      }
1046      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1047      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1048      locality += normalizedDelta;
1049    }
1050
1051    @Override
1052    protected double cost() {
1053      return 1 - locality;
1054    }
1055
1056    private int getMostLocalEntityForRegion(int region) {
1057      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1058    }
1059
1060    private double getWeightedLocality(int region, int entity) {
1061      return cluster.getOrComputeWeightedLocality(region, entity, type);
1062    }
1063
1064  }
1065
1066  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1067
1068    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1069    private static final float DEFAULT_LOCALITY_COST = 25;
1070
1071    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1072      super(
1073          conf,
1074          srv,
1075          LocalityType.SERVER,
1076          LOCALITY_COST_KEY,
1077          DEFAULT_LOCALITY_COST
1078      );
1079    }
1080
1081    @Override
1082    int regionIndexToEntityIndex(int region) {
1083      return cluster.regionIndexToServerIndex[region];
1084    }
1085  }
1086
1087  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1088
1089    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1090    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1091
1092    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1093      super(
1094          conf,
1095          services,
1096          LocalityType.RACK,
1097          RACK_LOCALITY_COST_KEY,
1098          DEFAULT_RACK_LOCALITY_COST
1099      );
1100    }
1101
1102    @Override
1103    int regionIndexToEntityIndex(int region) {
1104      return cluster.getRackForRegion(region);
1105    }
1106  }
1107
1108  /**
1109   * Base class the allows writing costs functions from rolling average of some
1110   * number from RegionLoad.
1111   */
1112  abstract static class CostFromRegionLoadFunction extends CostFunction {
1113
1114    private ClusterMetrics clusterStatus = null;
1115    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1116    private double[] stats = null;
1117    CostFromRegionLoadFunction(Configuration conf) {
1118      super(conf);
1119    }
1120
1121    void setClusterMetrics(ClusterMetrics status) {
1122      this.clusterStatus = status;
1123    }
1124
1125    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1126      this.loads = l;
1127    }
1128
1129    @Override
1130    protected double cost() {
1131      if (clusterStatus == null || loads == null) {
1132        return 0;
1133      }
1134
1135      if (stats == null || stats.length != cluster.numServers) {
1136        stats = new double[cluster.numServers];
1137      }
1138
1139      for (int i =0; i < stats.length; i++) {
1140        //Cost this server has from RegionLoad
1141        long cost = 0;
1142
1143        // for every region on this server get the rl
1144        for(int regionIndex:cluster.regionsPerServer[i]) {
1145          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1146
1147          // Now if we found a region load get the type of cost that was requested.
1148          if (regionLoadList != null) {
1149            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1150          }
1151        }
1152
1153        // Add the total cost to the stats.
1154        stats[i] = cost;
1155      }
1156
1157      // Now return the scaled cost from data held in the stats object.
1158      return costFromArray(stats);
1159    }
1160
1161    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1162      double cost = 0;
1163      for (BalancerRegionLoad rl : regionLoadList) {
1164        cost += getCostFromRl(rl);
1165      }
1166      return cost / regionLoadList.size();
1167    }
1168
1169    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1170  }
1171
1172  /**
1173   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1174   * We do not compare about the actual rate in requests per second but rather the rate relative
1175   * to the rest of the regions.
1176   */
1177  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1178
1179    CostFromRegionLoadAsRateFunction(Configuration conf) {
1180      super(conf);
1181    }
1182
1183    @Override
1184    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1185      double cost = 0;
1186      double previous = 0;
1187      boolean isFirst = true;
1188      for (BalancerRegionLoad rl : regionLoadList) {
1189        double current = getCostFromRl(rl);
1190        if (isFirst) {
1191          isFirst = false;
1192        } else {
1193          cost += current - previous;
1194        }
1195        previous = current;
1196      }
1197      return Math.max(0, cost / (regionLoadList.size() - 1));
1198    }
1199  }
1200
1201  /**
1202   * Compute the cost of total number of read requests  The more unbalanced the higher the
1203   * computed cost will be.  This uses a rolling average of regionload.
1204   */
1205
1206  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1207
1208    private static final String READ_REQUEST_COST_KEY =
1209        "hbase.master.balancer.stochastic.readRequestCost";
1210    private static final float DEFAULT_READ_REQUEST_COST = 5;
1211
1212    ReadRequestCostFunction(Configuration conf) {
1213      super(conf);
1214      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1215    }
1216
1217    @Override
1218    protected double getCostFromRl(BalancerRegionLoad rl) {
1219      return rl.getReadRequestsCount();
1220    }
1221  }
1222
1223  /**
1224   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1225   * computed cost will be.  This uses a rolling average of regionload.
1226   */
1227
1228  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1229
1230    private static final String CP_REQUEST_COST_KEY =
1231        "hbase.master.balancer.stochastic.cpRequestCost";
1232    private static final float DEFAULT_CP_REQUEST_COST = 5;
1233
1234    CPRequestCostFunction(Configuration conf) {
1235      super(conf);
1236      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1237    }
1238
1239    @Override
1240    protected double getCostFromRl(BalancerRegionLoad rl) {
1241      return rl.getCpRequestsCount();
1242    }
1243  }
1244
1245  /**
1246   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1247   * computed cost will be.  This uses a rolling average of regionload.
1248   */
1249  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1250
1251    private static final String WRITE_REQUEST_COST_KEY =
1252        "hbase.master.balancer.stochastic.writeRequestCost";
1253    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1254
1255    WriteRequestCostFunction(Configuration conf) {
1256      super(conf);
1257      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1258    }
1259
1260    @Override
1261    protected double getCostFromRl(BalancerRegionLoad rl) {
1262      return rl.getWriteRequestsCount();
1263    }
1264  }
1265
1266  /**
1267   * A cost function for region replicas. We give a very high cost to hosting
1268   * replicas of the same region in the same host. We do not prevent the case
1269   * though, since if numReplicas > numRegionServers, we still want to keep the
1270   * replica open.
1271   */
1272  static class RegionReplicaHostCostFunction extends CostFunction {
1273    private static final String REGION_REPLICA_HOST_COST_KEY =
1274        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1275    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1276
1277    long maxCost = 0;
1278    long[] costsPerGroup; // group is either server, host or rack
1279    int[][] primariesOfRegionsPerGroup;
1280
1281    public RegionReplicaHostCostFunction(Configuration conf) {
1282      super(conf);
1283      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1284        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1285    }
1286
1287    @Override
1288    void init(Cluster cluster) {
1289      super.init(cluster);
1290      // max cost is the case where every region replica is hosted together regardless of host
1291      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1292      costsPerGroup = new long[cluster.numHosts];
1293      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1294          ? cluster.primariesOfRegionsPerHost
1295          : cluster.primariesOfRegionsPerServer;
1296      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1297        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1298      }
1299    }
1300
1301    long getMaxCost(Cluster cluster) {
1302      if (!cluster.hasRegionReplicas) {
1303        return 0; // short circuit
1304      }
1305      // max cost is the case where every region replica is hosted together regardless of host
1306      int[] primariesOfRegions = new int[cluster.numRegions];
1307      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1308          cluster.regions.length);
1309
1310      Arrays.sort(primariesOfRegions);
1311
1312      // compute numReplicas from the sorted array
1313      return costPerGroup(primariesOfRegions);
1314    }
1315
1316    @Override
1317    boolean isNeeded() {
1318      return cluster.hasRegionReplicas;
1319    }
1320
1321    @Override
1322    protected double cost() {
1323      if (maxCost <= 0) {
1324        return 0;
1325      }
1326
1327      long totalCost = 0;
1328      for (int i = 0 ; i < costsPerGroup.length; i++) {
1329        totalCost += costsPerGroup[i];
1330      }
1331      return scale(0, maxCost, totalCost);
1332    }
1333
1334    /**
1335     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1336     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1337     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1338     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1339     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1340     * @return a sum of numReplicas-1 squared for each primary region in the group.
1341     */
1342    protected long costPerGroup(int[] primariesOfRegions) {
1343      long cost = 0;
1344      int currentPrimary = -1;
1345      int currentPrimaryIndex = -1;
1346      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1347      // sharing the same primary will have consecutive numbers in the array.
1348      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1349        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1350        if (primary != currentPrimary) { // we see a new primary
1351          int numReplicas = j - currentPrimaryIndex;
1352          // square the cost
1353          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1354            cost += (numReplicas - 1) * (numReplicas - 1);
1355          }
1356          currentPrimary = primary;
1357          currentPrimaryIndex = j;
1358        }
1359      }
1360
1361      return cost;
1362    }
1363
1364    @Override
1365    protected void regionMoved(int region, int oldServer, int newServer) {
1366      if (maxCost <= 0) {
1367        return; // no need to compute
1368      }
1369      if (cluster.multiServersPerHost) {
1370        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1371        int newHost = cluster.serverIndexToHostIndex[newServer];
1372        if (newHost != oldHost) {
1373          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1374          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1375        }
1376      } else {
1377        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1378        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1379      }
1380    }
1381  }
1382
1383  /**
1384   * A cost function for region replicas for the rack distribution. We give a relatively high
1385   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1386   * though.
1387   */
1388  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1389    private static final String REGION_REPLICA_RACK_COST_KEY =
1390        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1391    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1392
1393    public RegionReplicaRackCostFunction(Configuration conf) {
1394      super(conf);
1395      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1396        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1397    }
1398
1399    @Override
1400    void init(Cluster cluster) {
1401      this.cluster = cluster;
1402      if (cluster.numRacks <= 1) {
1403        maxCost = 0;
1404        return; // disabled for 1 rack
1405      }
1406      // max cost is the case where every region replica is hosted together regardless of rack
1407      maxCost = getMaxCost(cluster);
1408      costsPerGroup = new long[cluster.numRacks];
1409      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1410        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1411      }
1412    }
1413
1414    @Override
1415    protected void regionMoved(int region, int oldServer, int newServer) {
1416      if (maxCost <= 0) {
1417        return; // no need to compute
1418      }
1419      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1420      int newRack = cluster.serverIndexToRackIndex[newServer];
1421      if (newRack != oldRack) {
1422        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1423        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1424      }
1425    }
1426  }
1427
1428  /**
1429   * Compute the cost of total memstore size.  The more unbalanced the higher the
1430   * computed cost will be.  This uses a rolling average of regionload.
1431   */
1432  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1433
1434    private static final String MEMSTORE_SIZE_COST_KEY =
1435        "hbase.master.balancer.stochastic.memstoreSizeCost";
1436    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1437
1438    MemStoreSizeCostFunction(Configuration conf) {
1439      super(conf);
1440      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1441    }
1442
1443    @Override
1444    protected double getCostFromRl(BalancerRegionLoad rl) {
1445      return rl.getMemStoreSizeMB();
1446    }
1447  }
1448
1449  /**
1450   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1451   * computed cost will be.  This uses a rolling average of regionload.
1452   */
1453  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1454
1455    private static final String STOREFILE_SIZE_COST_KEY =
1456        "hbase.master.balancer.stochastic.storefileSizeCost";
1457    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1458
1459    StoreFileCostFunction(Configuration conf) {
1460      super(conf);
1461      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1462    }
1463
1464    @Override
1465    protected double getCostFromRl(BalancerRegionLoad rl) {
1466      return rl.getStorefileSizeMB();
1467    }
1468  }
1469
1470  /**
1471   * A helper function to compose the attribute name from tablename and costfunction name
1472   */
1473  public static String composeAttributeName(String tableName, String costFunctionName) {
1474    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1475  }
1476}