001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Deque;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Random;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.BalancerDecision;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.RegionPlan;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
049import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
050import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
051import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
052import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.ReflectionUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061
062
063/**
064 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
065 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
066 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
067 * <ul>
068 * <li>Region Load</li>
069 * <li>Table Load</li>
070 * <li>Data Locality</li>
071 * <li>Memstore Sizes</li>
072 * <li>Storefile Sizes</li>
073 * </ul>
074 *
075 *
076 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
077 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
078 * scaled by their respective multipliers:</p>
079 *
080 * <ul>
081 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
082 *   <li>hbase.master.balancer.stochastic.moveCost</li>
083 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
084 *   <li>hbase.master.balancer.stochastic.localityCost</li>
085 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
086 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
087 * </ul>
088 *
089 * <p>You can also add custom Cost function by setting the the following configuration value:</p>
090 * <ul>
091 *     <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
092 * </ul>
093 *
094 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
095 *
096 * <p>In addition to the above configurations, the balancer can be tuned by the following
097 * configuration values:</p>
098 * <ul>
099 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
100 *   controls what the max number of regions that can be moved in a single invocation of this
101 *   balancer.</li>
102 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
103 *   regions is multiplied to try and get the number of times the balancer will
104 *   mutate all servers.</li>
105 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
106 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
107 *   value and the above computation.</li>
108 * </ul>
109 *
110 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
111 * so that the balancer gets the full picture of all loads on the cluster.</p>
112 */
113@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
114@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
115  justification="Complaint is about costFunctions not being synchronized; not end of the world")
116public class StochasticLoadBalancer extends BaseLoadBalancer {
117
118  protected static final String STEPS_PER_REGION_KEY =
119      "hbase.master.balancer.stochastic.stepsPerRegion";
120  protected static final String MAX_STEPS_KEY =
121      "hbase.master.balancer.stochastic.maxSteps";
122  protected static final String RUN_MAX_STEPS_KEY =
123      "hbase.master.balancer.stochastic.runMaxSteps";
124  protected static final String MAX_RUNNING_TIME_KEY =
125      "hbase.master.balancer.stochastic.maxRunningTime";
126  protected static final String KEEP_REGION_LOADS =
127      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
128  private static final String TABLE_FUNCTION_SEP = "_";
129  protected static final String MIN_COST_NEED_BALANCE_KEY =
130      "hbase.master.balancer.stochastic.minCostNeedBalance";
131  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
132          "hbase.master.balancer.stochastic.additionalCostFunctions";
133
134  protected static final Random RANDOM = new Random(System.currentTimeMillis());
135  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
136
137  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
138
139  // values are defaults
140  private int maxSteps = 1000000;
141  private boolean runMaxSteps = false;
142  private int stepsPerRegion = 800;
143  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
144  private int numRegionLoadsToRemember = 15;
145  private float minCostNeedBalance = 0.05f;
146
147  private List<CandidateGenerator> candidateGenerators;
148  private CostFromRegionLoadFunction[] regionLoadFunctions;
149  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
150
151  // to save and report costs to JMX
152  private Double curOverallCost = 0d;
153  private Double[] tempFunctionCosts;
154  private Double[] curFunctionCosts;
155
156  // Keep locality based picker and cost function to alert them
157  // when new services are offered
158  private LocalityBasedCandidateGenerator localityCandidateGenerator;
159  private ServerLocalityCostFunction localityCost;
160  private RackLocalityCostFunction rackLocalityCost;
161  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
162  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
163
164  /**
165   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
166   * default MetricsBalancer
167   */
168  public StochasticLoadBalancer() {
169    super(new MetricsStochasticBalancer());
170  }
171
172  @Override
173  public void onConfigurationChange(Configuration conf) {
174    setConf(conf);
175  }
176
177  @Override
178  public synchronized void setConf(Configuration conf) {
179    super.setConf(conf);
180    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
181    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
182    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
183    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
184
185    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
186    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
187    if (localityCandidateGenerator == null) {
188      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
189    }
190    localityCost = new ServerLocalityCostFunction(conf, services);
191    rackLocalityCost = new RackLocalityCostFunction(conf, services);
192
193    if (this.candidateGenerators == null) {
194      candidateGenerators = Lists.newArrayList();
195      candidateGenerators.add(new RandomCandidateGenerator());
196      candidateGenerators.add(new LoadCandidateGenerator());
197      candidateGenerators.add(localityCandidateGenerator);
198      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
199    }
200    regionLoadFunctions = new CostFromRegionLoadFunction[] {
201      new ReadRequestCostFunction(conf),
202      new CPRequestCostFunction(conf),
203      new WriteRequestCostFunction(conf),
204      new MemStoreSizeCostFunction(conf),
205      new StoreFileCostFunction(conf)
206    };
207    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
208    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
209
210    costFunctions = new ArrayList<>();
211    addCostFunction(new RegionCountSkewCostFunction(conf));
212    addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
213    addCostFunction(new MoveCostFunction(conf));
214    addCostFunction(localityCost);
215    addCostFunction(rackLocalityCost);
216    addCostFunction(new TableSkewCostFunction(conf));
217    addCostFunction(regionReplicaHostCostFunction);
218    addCostFunction(regionReplicaRackCostFunction);
219    addCostFunction(regionLoadFunctions[0]);
220    addCostFunction(regionLoadFunctions[1]);
221    addCostFunction(regionLoadFunctions[2]);
222    addCostFunction(regionLoadFunctions[3]);
223    addCostFunction(regionLoadFunctions[4]);
224    loadCustomCostFunctions(conf);
225
226    curFunctionCosts= new Double[costFunctions.size()];
227    tempFunctionCosts= new Double[costFunctions.size()];
228
229    boolean isBalancerDecisionRecording = getConf()
230      .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
231        BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
232    if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
233      this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
234    }
235
236    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
237            ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
238            Arrays.toString(getCostFunctionNames()) + " etc.");
239  }
240
241  private void loadCustomCostFunctions(Configuration conf) {
242    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
243
244    if (null == functionsNames) {
245      return;
246    }
247
248    costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
249      Class<? extends CostFunction> klass = null;
250      try {
251        klass = (Class<? extends CostFunction>) Class.forName(c);
252      } catch (ClassNotFoundException e) {
253        LOG.warn("Cannot load class " + c + "': " + e.getMessage());
254      }
255      if (null == klass) {
256        return null;
257      }
258      CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
259      LOG.info(
260        "Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
261      return reflected;
262    }).filter(Objects::nonNull).collect(Collectors.toList()));
263  }
264
265  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
266    this.candidateGenerators = customCandidateGenerators;
267  }
268
269  @Override
270  protected void setSlop(Configuration conf) {
271    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
272  }
273
274  @Override
275  public synchronized void setClusterMetrics(ClusterMetrics st) {
276    super.setClusterMetrics(st);
277    updateRegionLoad();
278    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
279      cost.setClusterMetrics(st);
280    }
281
282    // update metrics size
283    try {
284      // by-table or ensemble mode
285      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
286      int functionsCount = getCostFunctionNames().length;
287
288      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
289    } catch (Exception e) {
290      LOG.error("failed to get the size of all tables", e);
291    }
292  }
293
294  /**
295   * Update the number of metrics that are reported to JMX
296   */
297  public void updateMetricsSize(int size) {
298    if (metricsBalancer instanceof MetricsStochasticBalancer) {
299        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
300    }
301  }
302
303  @Override
304  public synchronized void setMasterServices(MasterServices masterServices) {
305    super.setMasterServices(masterServices);
306    this.localityCost.setServices(masterServices);
307    this.rackLocalityCost.setServices(masterServices);
308    this.localityCandidateGenerator.setServices(masterServices);
309  }
310
311  @Override
312  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
313    regionReplicaHostCostFunction.init(c);
314    if (regionReplicaHostCostFunction.cost() > 0) return true;
315    regionReplicaRackCostFunction.init(c);
316    if (regionReplicaRackCostFunction.cost() > 0) return true;
317    return false;
318  }
319
320  @Override
321  protected boolean needsBalance(TableName tableName, Cluster cluster) {
322    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
323    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
324      if (LOG.isDebugEnabled()) {
325        LOG.debug("Not running balancer because only " + cs.getNumServers()
326            + " active regionserver(s)");
327      }
328      return false;
329    }
330    if (areSomeRegionReplicasColocated(cluster)) {
331      return true;
332    }
333
334    if (idleRegionServerExist(cluster)){
335      return true;
336    }
337
338    double total = 0.0;
339    float sumMultiplier = 0.0f;
340    for (CostFunction c : costFunctions) {
341      float multiplier = c.getMultiplier();
342      if (multiplier <= 0) {
343        LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
344        continue;
345      }
346      if (!c.isNeeded()) {
347        LOG.trace("{} not needed", c.getClass().getSimpleName());
348        continue;
349      }
350      sumMultiplier += multiplier;
351      total += c.cost() * multiplier;
352    }
353
354    boolean balanced = total <= 0 || sumMultiplier <= 0 ||
355        (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
356    if (LOG.isDebugEnabled()) {
357      LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
358          balanced ? "Skipping load balancing because balanced" : "We need to load balance",
359          isByTable ? String.format("table (%s)", tableName) : "cluster",
360          total, sumMultiplier, minCostNeedBalance);
361      if (LOG.isTraceEnabled()) {
362        LOG.trace("Balance decision detailed function costs={}", functionCost());
363      }
364    }
365    return !balanced;
366  }
367
368  @VisibleForTesting
369  Cluster.Action nextAction(Cluster cluster) {
370    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
371            .generate(cluster);
372  }
373
374  /**
375   * Given the cluster state this will try and approach an optimal balance. This
376   * should always approach the optimal state given enough steps.
377   */
378  @Override
379  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
380    List<RegionInfo>> loadOfOneTable) {
381    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
382    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
383      return plans;
384    }
385
386    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
387      if (loadOfOneTable.size() <= 2) {
388        return null;
389      }
390      loadOfOneTable = new HashMap<>(loadOfOneTable);
391      loadOfOneTable.remove(masterServerName);
392    }
393
394    // On clusters with lots of HFileLinks or lots of reference files,
395    // instantiating the storefile infos can be quite expensive.
396    // Allow turning this feature off if the locality cost is not going to
397    // be used in any computations.
398    RegionLocationFinder finder = null;
399    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
400        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
401      finder = this.regionFinder;
402    }
403
404    //The clusterState that is given to this method contains the state
405    //of all the regions in the table(s) (that's true today)
406    // Keep track of servers to iterate through them.
407    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
408
409    long startTime = EnvironmentEdgeManager.currentTime();
410
411    initCosts(cluster);
412
413    if (!needsBalance(tableName, cluster)) {
414      return null;
415    }
416
417    double currentCost = computeCost(cluster, Double.MAX_VALUE);
418    curOverallCost = currentCost;
419    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
420    double initCost = currentCost;
421    double newCost;
422
423    long computedMaxSteps;
424    if (runMaxSteps) {
425      computedMaxSteps = Math.max(this.maxSteps,
426          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
427    } else {
428      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
429          (long)cluster.numServers;
430      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
431      if (calculatedMaxSteps > maxSteps) {
432        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
433            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
434            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
435            + "(This config change does not require service restart)", calculatedMaxSteps,
436            maxSteps);
437      }
438    }
439    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
440        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
441
442    final String initFunctionTotalCosts = totalCostsPerFunc();
443    // Perform a stochastic walk to see if we can get a good fit.
444    long step;
445
446    for (step = 0; step < computedMaxSteps; step++) {
447      Cluster.Action action = nextAction(cluster);
448
449      if (action.type == Type.NULL) {
450        continue;
451      }
452
453      cluster.doAction(action);
454      updateCostsWithAction(cluster, action);
455
456      newCost = computeCost(cluster, currentCost);
457
458      // Should this be kept?
459      if (newCost < currentCost) {
460        currentCost = newCost;
461
462        // save for JMX
463        curOverallCost = currentCost;
464        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
465      } else {
466        // Put things back the way they were before.
467        // TODO: undo by remembering old values
468        Action undoAction = action.undoAction();
469        cluster.doAction(undoAction);
470        updateCostsWithAction(cluster, undoAction);
471      }
472
473      if (EnvironmentEdgeManager.currentTime() - startTime >
474          maxRunningTime) {
475        break;
476      }
477    }
478    long endTime = EnvironmentEdgeManager.currentTime();
479
480    metricsBalancer.balanceCluster(endTime - startTime);
481
482    // update costs metrics
483    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
484    if (initCost > currentCost) {
485      plans = createRegionPlans(cluster);
486      LOG.info("Finished computing new load balance plan. Computation took {}" +
487        " to try {} different iterations.  Found a solution that moves " +
488        "{} regions; Going from a computed cost of {}" +
489        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
490        step, plans.size(), initCost, currentCost);
491      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
492      return plans;
493    }
494    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
495      "{}, and did not find anything with a computed cost less than {}", step,
496      java.time.Duration.ofMillis(endTime - startTime), initCost);
497    return null;
498  }
499
500  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
501      double initCost, String initFunctionTotalCosts, long step) {
502    if (this.namedQueueRecorder != null) {
503      List<String> regionPlans = new ArrayList<>();
504      for (RegionPlan plan : plans) {
505        regionPlans.add(
506          "table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
507            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
508      }
509      BalancerDecision balancerDecision =
510        new BalancerDecision.Builder()
511          .setInitTotalCost(initCost)
512          .setInitialFunctionCosts(initFunctionTotalCosts)
513          .setComputedTotalCost(currentCost)
514          .setFinalFunctionCosts(totalCostsPerFunc())
515          .setComputedSteps(step)
516          .setRegionPlans(regionPlans).build();
517      namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
518    }
519  }
520
521  /**
522   * update costs to JMX
523   */
524  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
525    if (tableName == null) return;
526
527    // check if the metricsBalancer is MetricsStochasticBalancer before casting
528    if (metricsBalancer instanceof MetricsStochasticBalancer) {
529      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
530      // overall cost
531      balancer.updateStochasticCost(tableName.getNameAsString(),
532        "Overall", "Overall cost", overall);
533
534      // each cost function
535      for (int i = 0; i < costFunctions.size(); i++) {
536        CostFunction costFunction = costFunctions.get(i);
537        String costFunctionName = costFunction.getClass().getSimpleName();
538        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
539        // TODO: cost function may need a specific description
540        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
541          "The percent of " + costFunctionName, costPercent);
542      }
543    }
544  }
545
546  private void addCostFunction(CostFunction costFunction) {
547    if (costFunction.getMultiplier() > 0) {
548      costFunctions.add(costFunction);
549    }
550  }
551
552  private String functionCost() {
553    StringBuilder builder = new StringBuilder();
554    for (CostFunction c:costFunctions) {
555      builder.append(c.getClass().getSimpleName());
556      builder.append(" : (");
557      builder.append(c.getMultiplier());
558      builder.append(", ");
559      builder.append(c.cost());
560      builder.append("); ");
561    }
562    return builder.toString();
563  }
564
565  private String totalCostsPerFunc() {
566    StringBuilder builder = new StringBuilder();
567    for (CostFunction c : costFunctions) {
568      if (c.getMultiplier() * c.cost() > 0.0) {
569        builder.append(" ");
570        builder.append(c.getClass().getSimpleName());
571        builder.append(" : ");
572        builder.append(c.getMultiplier() * c.cost());
573        builder.append(";");
574      }
575    }
576    if (builder.length() > 0) {
577      builder.deleteCharAt(builder.length() - 1);
578    }
579    return builder.toString();
580  }
581
582  /**
583   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
584   * state.
585   *
586   * @param cluster The state of the cluster
587   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
588   */
589  private List<RegionPlan> createRegionPlans(Cluster cluster) {
590    List<RegionPlan> plans = new LinkedList<>();
591    for (int regionIndex = 0;
592         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
593      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
594      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
595
596      if (initialServerIndex != newServerIndex) {
597        RegionInfo region = cluster.regions[regionIndex];
598        ServerName initialServer = cluster.servers[initialServerIndex];
599        ServerName newServer = cluster.servers[newServerIndex];
600
601        if (LOG.isTraceEnabled()) {
602          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
603              + initialServer.getHostname() + " to " + newServer.getHostname());
604        }
605        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
606        plans.add(rp);
607      }
608    }
609    return plans;
610  }
611
612  /**
613   * Store the current region loads.
614   */
615  private synchronized void updateRegionLoad() {
616    // We create a new hashmap so that regions that are no longer there are removed.
617    // However we temporarily need the old loads so we can use them to keep the rolling average.
618    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
619    loads = new HashMap<>();
620
621    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
622      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
623        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
624        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
625        if (rLoads == null) {
626          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
627        } else if (rLoads.size() >= numRegionLoadsToRemember) {
628          rLoads.remove();
629        }
630        rLoads.add(new BalancerRegionLoad(rm));
631        loads.put(regionNameAsString, rLoads);
632      });
633    });
634
635    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
636      cost.setLoads(loads);
637    }
638  }
639
640  protected void initCosts(Cluster cluster) {
641    for (CostFunction c:costFunctions) {
642      c.init(cluster);
643    }
644  }
645
646  protected void updateCostsWithAction(Cluster cluster, Action action) {
647    for (CostFunction c : costFunctions) {
648      c.postAction(action);
649    }
650  }
651
652  /**
653   * Get the names of the cost functions
654   */
655  public String[] getCostFunctionNames() {
656    if (costFunctions == null) return null;
657    String[] ret = new String[costFunctions.size()];
658    for (int i = 0; i < costFunctions.size(); i++) {
659      CostFunction c = costFunctions.get(i);
660      ret[i] = c.getClass().getSimpleName();
661    }
662
663    return ret;
664  }
665
666  /**
667   * This is the main cost function.  It will compute a cost associated with a proposed cluster
668   * state.  All different costs will be combined with their multipliers to produce a double cost.
669   *
670   * @param cluster The state of the cluster
671   * @param previousCost the previous cost. This is used as an early out.
672   * @return a double of a cost associated with the proposed cluster state.  This cost is an
673   *         aggregate of all individual cost functions.
674   */
675  protected double computeCost(Cluster cluster, double previousCost) {
676    double total = 0;
677
678    for (int i = 0; i < costFunctions.size(); i++) {
679      CostFunction c = costFunctions.get(i);
680      this.tempFunctionCosts[i] = 0.0;
681
682      if (c.getMultiplier() <= 0) {
683        continue;
684      }
685
686      Float multiplier = c.getMultiplier();
687      Double cost = c.cost();
688
689      this.tempFunctionCosts[i] = multiplier*cost;
690      total += this.tempFunctionCosts[i];
691
692      if (total > previousCost) {
693        break;
694      }
695    }
696
697    return total;
698  }
699
700  static class RandomCandidateGenerator extends CandidateGenerator {
701
702    @Override
703    Cluster.Action generate(Cluster cluster) {
704
705      int thisServer = pickRandomServer(cluster);
706
707      // Pick the other server
708      int otherServer = pickOtherRandomServer(cluster, thisServer);
709
710      return pickRandomRegions(cluster, thisServer, otherServer);
711    }
712  }
713
714  /**
715   * Generates candidates which moves the replicas out of the rack for
716   * co-hosted region replicas in the same rack
717   */
718  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
719    @Override
720    Cluster.Action generate(Cluster cluster) {
721      int rackIndex = pickRandomRack(cluster);
722      if (cluster.numRacks <= 1 || rackIndex == -1) {
723        return super.generate(cluster);
724      }
725
726      int regionIndex = selectCoHostedRegionPerGroup(
727        cluster.primariesOfRegionsPerRack[rackIndex],
728        cluster.regionsPerRack[rackIndex],
729        cluster.regionIndexToPrimaryIndex);
730
731      // if there are no pairs of region replicas co-hosted, default to random generator
732      if (regionIndex == -1) {
733        // default to randompicker
734        return randomGenerator.generate(cluster);
735      }
736
737      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
738      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
739
740      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
741      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
742      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
743      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
744    }
745  }
746
747  /**
748   * Base class of StochasticLoadBalancer's Cost Functions.
749   */
750  public abstract static class CostFunction {
751
752    private float multiplier = 0;
753
754    protected Cluster cluster;
755
756    public CostFunction(Configuration c) {
757    }
758
759    boolean isNeeded() {
760      return true;
761    }
762    float getMultiplier() {
763      return multiplier;
764    }
765
766    void setMultiplier(float m) {
767      this.multiplier = m;
768    }
769
770    /** Called once per LB invocation to give the cost function
771     * to initialize it's state, and perform any costly calculation.
772     */
773    void init(Cluster cluster) {
774      this.cluster = cluster;
775    }
776
777    /** Called once per cluster Action to give the cost function
778     * an opportunity to update it's state. postAction() is always
779     * called at least once before cost() is called with the cluster
780     * that this action is performed on. */
781    void postAction(Action action) {
782      switch (action.type) {
783      case NULL: break;
784      case ASSIGN_REGION:
785        AssignRegionAction ar = (AssignRegionAction) action;
786        regionMoved(ar.region, -1, ar.server);
787        break;
788      case MOVE_REGION:
789        MoveRegionAction mra = (MoveRegionAction) action;
790        regionMoved(mra.region, mra.fromServer, mra.toServer);
791        break;
792      case SWAP_REGIONS:
793        SwapRegionsAction a = (SwapRegionsAction) action;
794        regionMoved(a.fromRegion, a.fromServer, a.toServer);
795        regionMoved(a.toRegion, a.toServer, a.fromServer);
796        break;
797      default:
798        throw new RuntimeException("Uknown action:" + action.type);
799      }
800    }
801
802    protected void regionMoved(int region, int oldServer, int newServer) {
803    }
804
805    protected abstract double cost();
806
807    @SuppressWarnings("checkstyle:linelength")
808    /**
809     * Function to compute a scaled cost using
810     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
811     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
812     * possible is all of the elements in one region server and the rest having 0.
813     *
814     * @param stats the costs
815     * @return a scaled set of costs.
816     */
817    protected double costFromArray(double[] stats) {
818      double totalCost = 0;
819      double total = getSum(stats);
820
821      double count = stats.length;
822      double mean = total/count;
823
824      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
825      // a zero sum cost for this to make sense.
826      double max = ((count - 1) * mean) + (total - mean);
827
828      // It's possible that there aren't enough regions to go around
829      double min;
830      if (count > total) {
831        min = ((count - total) * mean) + ((1 - mean) * total);
832      } else {
833        // Some will have 1 more than everything else.
834        int numHigh = (int) (total - (Math.floor(mean) * count));
835        int numLow = (int) (count - numHigh);
836
837        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
838
839      }
840      min = Math.max(0, min);
841      for (int i=0; i<stats.length; i++) {
842        double n = stats[i];
843        double diff = Math.abs(mean - n);
844        totalCost += diff;
845      }
846
847      double scaled =  scale(min, max, totalCost);
848      return scaled;
849    }
850
851    private double getSum(double[] stats) {
852      double total = 0;
853      for(double s:stats) {
854        total += s;
855      }
856      return total;
857    }
858
859    /**
860     * Scale the value between 0 and 1.
861     *
862     * @param min   Min value
863     * @param max   The Max value
864     * @param value The value to be scaled.
865     * @return The scaled value.
866     */
867    protected double scale(double min, double max, double value) {
868      if (max <= min || value <= min) {
869        return 0;
870      }
871      if ((max - min) == 0) return 0;
872
873      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
874    }
875  }
876
877  /**
878   * Given the starting state of the regions and a potential ending state
879   * compute cost based upon the number of regions that have moved.
880   */
881  static class MoveCostFunction extends CostFunction {
882    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
883    private static final String MOVE_COST_OFFPEAK_KEY =
884      "hbase.master.balancer.stochastic.moveCost.offpeak";
885    private static final String MAX_MOVES_PERCENT_KEY =
886        "hbase.master.balancer.stochastic.maxMovePercent";
887    static final float DEFAULT_MOVE_COST = 7;
888    static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
889    private static final int DEFAULT_MAX_MOVES = 600;
890    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
891
892    private final float maxMovesPercent;
893    private final Configuration conf;
894
895    MoveCostFunction(Configuration conf) {
896      super(conf);
897      this.conf = conf;
898      // What percent of the number of regions a single run of the balancer can move.
899      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
900    }
901
902    @Override
903    protected double cost() {
904      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
905      // that large benefits are need to overcome the cost of a move.
906      if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
907        this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
908      } else {
909        this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
910      }
911      // Try and size the max number of Moves, but always be prepared to move some.
912      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
913          DEFAULT_MAX_MOVES);
914
915      double moveCost = cluster.numMovedRegions;
916
917      // Don't let this single balance move more than the max moves.
918      // This allows better scaling to accurately represent the actual cost of a move.
919      if (moveCost > maxMoves) {
920        return 1000000;   // return a number much greater than any of the other cost
921      }
922
923      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
924    }
925  }
926
927  /**
928   * Compute the cost of a potential cluster state from skew in number of
929   * regions on a cluster.
930   */
931  static class RegionCountSkewCostFunction extends CostFunction {
932    static final String REGION_COUNT_SKEW_COST_KEY =
933        "hbase.master.balancer.stochastic.regionCountCost";
934    static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
935
936    private double[] stats = null;
937
938    RegionCountSkewCostFunction(Configuration conf) {
939      super(conf);
940      // Load multiplier should be the greatest as it is the most general way to balance data.
941      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
942    }
943
944    @Override
945    void init(Cluster cluster) {
946      super.init(cluster);
947      LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
948          cluster.numServers, cluster.numRegions);
949      if (LOG.isTraceEnabled()) {
950        for (int i =0; i < cluster.numServers; i++) {
951          LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
952              cluster.servers[i], cluster.regionsPerServer[i].length);
953        }
954      }
955    }
956
957    @Override
958    protected double cost() {
959      if (stats == null || stats.length != cluster.numServers) {
960        stats = new double[cluster.numServers];
961      }
962      for (int i =0; i < cluster.numServers; i++) {
963        stats[i] = cluster.regionsPerServer[i].length;
964      }
965      return costFromArray(stats);
966    }
967  }
968
969  /**
970   * Compute the cost of a potential cluster state from skew in number of
971   * primary regions on a cluster.
972   */
973  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
974    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
975        "hbase.master.balancer.stochastic.primaryRegionCountCost";
976    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
977
978    private double[] stats = null;
979
980    PrimaryRegionCountSkewCostFunction(Configuration conf) {
981      super(conf);
982      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
983      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
984        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
985    }
986
987    @Override
988    boolean isNeeded() {
989      return cluster.hasRegionReplicas;
990    }
991
992    @Override
993    protected double cost() {
994      if (!cluster.hasRegionReplicas) {
995        return 0;
996      }
997      if (stats == null || stats.length != cluster.numServers) {
998        stats = new double[cluster.numServers];
999      }
1000
1001      for (int i = 0; i < cluster.numServers; i++) {
1002        stats[i] = 0;
1003        for (int regionIdx : cluster.regionsPerServer[i]) {
1004          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1005            stats[i]++;
1006          }
1007        }
1008      }
1009
1010      return costFromArray(stats);
1011    }
1012  }
1013
1014  /**
1015   * Compute the cost of a potential cluster configuration based upon how evenly
1016   * distributed tables are.
1017   */
1018  static class TableSkewCostFunction extends CostFunction {
1019
1020    private static final String TABLE_SKEW_COST_KEY =
1021        "hbase.master.balancer.stochastic.tableSkewCost";
1022    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1023
1024    TableSkewCostFunction(Configuration conf) {
1025      super(conf);
1026      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1027    }
1028
1029    @Override
1030    protected double cost() {
1031      double max = cluster.numRegions;
1032      double min = ((double) cluster.numRegions) / cluster.numServers;
1033      double value = 0;
1034
1035      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1036        value += cluster.numMaxRegionsPerTable[i];
1037      }
1038
1039      return scale(min, max, value);
1040    }
1041  }
1042
1043  /**
1044   * Compute a cost of a potential cluster configuration based upon where
1045   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1046   */
1047  static abstract class LocalityBasedCostFunction extends CostFunction {
1048
1049    private final LocalityType type;
1050
1051    private double bestLocality; // best case locality across cluster weighted by local data size
1052    private double locality; // current locality across cluster weighted by local data size
1053
1054    private MasterServices services;
1055
1056    LocalityBasedCostFunction(Configuration conf,
1057                              MasterServices srv,
1058                              LocalityType type,
1059                              String localityCostKey,
1060                              float defaultLocalityCost) {
1061      super(conf);
1062      this.type = type;
1063      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1064      this.services = srv;
1065      this.locality = 0.0;
1066      this.bestLocality = 0.0;
1067    }
1068
1069    /**
1070     * Maps region to the current entity (server or rack) on which it is stored
1071     */
1072    abstract int regionIndexToEntityIndex(int region);
1073
1074    public void setServices(MasterServices srvc) {
1075      this.services = srvc;
1076    }
1077
1078    @Override
1079    void init(Cluster cluster) {
1080      super.init(cluster);
1081      locality = 0.0;
1082      bestLocality = 0.0;
1083
1084      // If no master, no computation will work, so assume 0 cost
1085      if (this.services == null) {
1086        return;
1087      }
1088
1089      for (int region = 0; region < cluster.numRegions; region++) {
1090        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1091        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1092      }
1093
1094      // We normalize locality to be a score between 0 and 1.0 representing how good it
1095      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1096      // (and the cost is 0)
1097      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1098    }
1099
1100    @Override
1101    protected void regionMoved(int region, int oldServer, int newServer) {
1102      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1103      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1104      if (this.services == null) {
1105        return;
1106      }
1107      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1108      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1109      locality += normalizedDelta;
1110    }
1111
1112    @Override
1113    protected double cost() {
1114      return 1 - locality;
1115    }
1116
1117    private int getMostLocalEntityForRegion(int region) {
1118      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1119    }
1120
1121    private double getWeightedLocality(int region, int entity) {
1122      return cluster.getOrComputeWeightedLocality(region, entity, type);
1123    }
1124
1125  }
1126
1127  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1128
1129    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1130    private static final float DEFAULT_LOCALITY_COST = 25;
1131
1132    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1133      super(
1134          conf,
1135          srv,
1136          LocalityType.SERVER,
1137          LOCALITY_COST_KEY,
1138          DEFAULT_LOCALITY_COST
1139      );
1140    }
1141
1142    @Override
1143    int regionIndexToEntityIndex(int region) {
1144      return cluster.regionIndexToServerIndex[region];
1145    }
1146  }
1147
1148  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1149
1150    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1151    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1152
1153    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1154      super(
1155          conf,
1156          services,
1157          LocalityType.RACK,
1158          RACK_LOCALITY_COST_KEY,
1159          DEFAULT_RACK_LOCALITY_COST
1160      );
1161    }
1162
1163    @Override
1164    int regionIndexToEntityIndex(int region) {
1165      return cluster.getRackForRegion(region);
1166    }
1167  }
1168
1169  /**
1170   * Base class the allows writing costs functions from rolling average of some
1171   * number from RegionLoad.
1172   */
1173  abstract static class CostFromRegionLoadFunction extends CostFunction {
1174
1175    private ClusterMetrics clusterStatus = null;
1176    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1177    private double[] stats = null;
1178    CostFromRegionLoadFunction(Configuration conf) {
1179      super(conf);
1180    }
1181
1182    void setClusterMetrics(ClusterMetrics status) {
1183      this.clusterStatus = status;
1184    }
1185
1186    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1187      this.loads = l;
1188    }
1189
1190    @Override
1191    protected double cost() {
1192      if (clusterStatus == null || loads == null) {
1193        return 0;
1194      }
1195
1196      if (stats == null || stats.length != cluster.numServers) {
1197        stats = new double[cluster.numServers];
1198      }
1199
1200      for (int i =0; i < stats.length; i++) {
1201        //Cost this server has from RegionLoad
1202        long cost = 0;
1203
1204        // for every region on this server get the rl
1205        for(int regionIndex:cluster.regionsPerServer[i]) {
1206          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1207
1208          // Now if we found a region load get the type of cost that was requested.
1209          if (regionLoadList != null) {
1210            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1211          }
1212        }
1213
1214        // Add the total cost to the stats.
1215        stats[i] = cost;
1216      }
1217
1218      // Now return the scaled cost from data held in the stats object.
1219      return costFromArray(stats);
1220    }
1221
1222    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1223      double cost = 0;
1224      for (BalancerRegionLoad rl : regionLoadList) {
1225        cost += getCostFromRl(rl);
1226      }
1227      return cost / regionLoadList.size();
1228    }
1229
1230    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1231  }
1232
1233  /**
1234   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1235   * We do not compare about the actual rate in requests per second but rather the rate relative
1236   * to the rest of the regions.
1237   */
1238  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1239
1240    CostFromRegionLoadAsRateFunction(Configuration conf) {
1241      super(conf);
1242    }
1243
1244    @Override
1245    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1246      double cost = 0;
1247      double previous = 0;
1248      boolean isFirst = true;
1249      for (BalancerRegionLoad rl : regionLoadList) {
1250        double current = getCostFromRl(rl);
1251        if (isFirst) {
1252          isFirst = false;
1253        } else {
1254          cost += current - previous;
1255        }
1256        previous = current;
1257      }
1258      return Math.max(0, cost / (regionLoadList.size() - 1));
1259    }
1260  }
1261
1262  /**
1263   * Compute the cost of total number of read requests  The more unbalanced the higher the
1264   * computed cost will be.  This uses a rolling average of regionload.
1265   */
1266
1267  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1268
1269    private static final String READ_REQUEST_COST_KEY =
1270        "hbase.master.balancer.stochastic.readRequestCost";
1271    private static final float DEFAULT_READ_REQUEST_COST = 5;
1272
1273    ReadRequestCostFunction(Configuration conf) {
1274      super(conf);
1275      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1276    }
1277
1278    @Override
1279    protected double getCostFromRl(BalancerRegionLoad rl) {
1280      return rl.getReadRequestsCount();
1281    }
1282  }
1283
1284  /**
1285   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1286   * computed cost will be.  This uses a rolling average of regionload.
1287   */
1288
1289  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1290
1291    private static final String CP_REQUEST_COST_KEY =
1292        "hbase.master.balancer.stochastic.cpRequestCost";
1293    private static final float DEFAULT_CP_REQUEST_COST = 5;
1294
1295    CPRequestCostFunction(Configuration conf) {
1296      super(conf);
1297      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1298    }
1299
1300    @Override
1301    protected double getCostFromRl(BalancerRegionLoad rl) {
1302      return rl.getCpRequestsCount();
1303    }
1304  }
1305
1306  /**
1307   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1308   * computed cost will be.  This uses a rolling average of regionload.
1309   */
1310  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1311
1312    private static final String WRITE_REQUEST_COST_KEY =
1313        "hbase.master.balancer.stochastic.writeRequestCost";
1314    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1315
1316    WriteRequestCostFunction(Configuration conf) {
1317      super(conf);
1318      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1319    }
1320
1321    @Override
1322    protected double getCostFromRl(BalancerRegionLoad rl) {
1323      return rl.getWriteRequestsCount();
1324    }
1325  }
1326
1327  /**
1328   * A cost function for region replicas. We give a very high cost to hosting
1329   * replicas of the same region in the same host. We do not prevent the case
1330   * though, since if numReplicas > numRegionServers, we still want to keep the
1331   * replica open.
1332   */
1333  static class RegionReplicaHostCostFunction extends CostFunction {
1334    private static final String REGION_REPLICA_HOST_COST_KEY =
1335        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1336    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1337
1338    long maxCost = 0;
1339    long[] costsPerGroup; // group is either server, host or rack
1340    int[][] primariesOfRegionsPerGroup;
1341
1342    public RegionReplicaHostCostFunction(Configuration conf) {
1343      super(conf);
1344      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1345        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1346    }
1347
1348    @Override
1349    void init(Cluster cluster) {
1350      super.init(cluster);
1351      // max cost is the case where every region replica is hosted together regardless of host
1352      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1353      costsPerGroup = new long[cluster.numHosts];
1354      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1355          ? cluster.primariesOfRegionsPerHost
1356          : cluster.primariesOfRegionsPerServer;
1357      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1358        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1359      }
1360    }
1361
1362    long getMaxCost(Cluster cluster) {
1363      if (!cluster.hasRegionReplicas) {
1364        return 0; // short circuit
1365      }
1366      // max cost is the case where every region replica is hosted together regardless of host
1367      int[] primariesOfRegions = new int[cluster.numRegions];
1368      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1369          cluster.regions.length);
1370
1371      Arrays.sort(primariesOfRegions);
1372
1373      // compute numReplicas from the sorted array
1374      return costPerGroup(primariesOfRegions);
1375    }
1376
1377    @Override
1378    boolean isNeeded() {
1379      return cluster.hasRegionReplicas;
1380    }
1381
1382    @Override
1383    protected double cost() {
1384      if (maxCost <= 0) {
1385        return 0;
1386      }
1387
1388      long totalCost = 0;
1389      for (int i = 0 ; i < costsPerGroup.length; i++) {
1390        totalCost += costsPerGroup[i];
1391      }
1392      return scale(0, maxCost, totalCost);
1393    }
1394
1395    /**
1396     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1397     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1398     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1399     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1400     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1401     * @return a sum of numReplicas-1 squared for each primary region in the group.
1402     */
1403    protected long costPerGroup(int[] primariesOfRegions) {
1404      long cost = 0;
1405      int currentPrimary = -1;
1406      int currentPrimaryIndex = -1;
1407      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1408      // sharing the same primary will have consecutive numbers in the array.
1409      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1410        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1411        if (primary != currentPrimary) { // we see a new primary
1412          int numReplicas = j - currentPrimaryIndex;
1413          // square the cost
1414          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1415            cost += (numReplicas - 1) * (numReplicas - 1);
1416          }
1417          currentPrimary = primary;
1418          currentPrimaryIndex = j;
1419        }
1420      }
1421
1422      return cost;
1423    }
1424
1425    @Override
1426    protected void regionMoved(int region, int oldServer, int newServer) {
1427      if (maxCost <= 0) {
1428        return; // no need to compute
1429      }
1430      if (cluster.multiServersPerHost) {
1431        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1432        int newHost = cluster.serverIndexToHostIndex[newServer];
1433        if (newHost != oldHost) {
1434          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1435          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1436        }
1437      } else {
1438        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1439        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1440      }
1441    }
1442  }
1443
1444  /**
1445   * A cost function for region replicas for the rack distribution. We give a relatively high
1446   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1447   * though.
1448   */
1449  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1450    private static final String REGION_REPLICA_RACK_COST_KEY =
1451        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1452    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1453
1454    public RegionReplicaRackCostFunction(Configuration conf) {
1455      super(conf);
1456      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1457        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1458    }
1459
1460    @Override
1461    void init(Cluster cluster) {
1462      this.cluster = cluster;
1463      if (cluster.numRacks <= 1) {
1464        maxCost = 0;
1465        return; // disabled for 1 rack
1466      }
1467      // max cost is the case where every region replica is hosted together regardless of rack
1468      maxCost = getMaxCost(cluster);
1469      costsPerGroup = new long[cluster.numRacks];
1470      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1471        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1472      }
1473    }
1474
1475    @Override
1476    protected void regionMoved(int region, int oldServer, int newServer) {
1477      if (maxCost <= 0) {
1478        return; // no need to compute
1479      }
1480      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1481      int newRack = cluster.serverIndexToRackIndex[newServer];
1482      if (newRack != oldRack) {
1483        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1484        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1485      }
1486    }
1487  }
1488
1489  /**
1490   * Compute the cost of total memstore size.  The more unbalanced the higher the
1491   * computed cost will be.  This uses a rolling average of regionload.
1492   */
1493  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1494
1495    private static final String MEMSTORE_SIZE_COST_KEY =
1496        "hbase.master.balancer.stochastic.memstoreSizeCost";
1497    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1498
1499    MemStoreSizeCostFunction(Configuration conf) {
1500      super(conf);
1501      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1502    }
1503
1504    @Override
1505    protected double getCostFromRl(BalancerRegionLoad rl) {
1506      return rl.getMemStoreSizeMB();
1507    }
1508  }
1509
1510  /**
1511   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1512   * computed cost will be.  This uses a rolling average of regionload.
1513   */
1514  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1515
1516    private static final String STOREFILE_SIZE_COST_KEY =
1517        "hbase.master.balancer.stochastic.storefileSizeCost";
1518    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1519
1520    StoreFileCostFunction(Configuration conf) {
1521      super(conf);
1522      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1523    }
1524
1525    @Override
1526    protected double getCostFromRl(BalancerRegionLoad rl) {
1527      return rl.getStorefileSizeMB();
1528    }
1529  }
1530
1531  /**
1532   * A helper function to compose the attribute name from tablename and costfunction name
1533   */
1534  public static String composeAttributeName(String tableName, String costFunctionName) {
1535    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1536  }
1537}