001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
055import org.apache.hbase.thirdparty.com.google.common.base.Optional;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058
059/**
060 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
061 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 *
071 *
072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
073 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
074 * scaled by their respective multipliers:</p>
075 *
076 * <ul>
077 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
078 *   <li>hbase.master.balancer.stochastic.moveCost</li>
079 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.localityCost</li>
081 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
082 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
083 * </ul>
084 *
085 * <p>In addition to the above configurations, the balancer can be tuned by the following
086 * configuration values:</p>
087 * <ul>
088 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
089 *   controls what the max number of regions that can be moved in a single invocation of this
090 *   balancer.</li>
091 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
092 *   regions is multiplied to try and get the number of times the balancer will
093 *   mutate all servers.</li>
094 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
095 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
096 *   value and the above computation.</li>
097 * </ul>
098 *
099 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
100 * so that the balancer gets the full picture of all loads on the cluster.</p>
101 */
102@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
103@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
104  justification="Complaint is about costFunctions not being synchronized; not end of the world")
105public class StochasticLoadBalancer extends BaseLoadBalancer {
106
107  protected static final String STEPS_PER_REGION_KEY =
108      "hbase.master.balancer.stochastic.stepsPerRegion";
109  protected static final String MAX_STEPS_KEY =
110      "hbase.master.balancer.stochastic.maxSteps";
111  protected static final String RUN_MAX_STEPS_KEY =
112      "hbase.master.balancer.stochastic.runMaxSteps";
113  protected static final String MAX_RUNNING_TIME_KEY =
114      "hbase.master.balancer.stochastic.maxRunningTime";
115  protected static final String KEEP_REGION_LOADS =
116      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
117  private static final String TABLE_FUNCTION_SEP = "_";
118  protected static final String MIN_COST_NEED_BALANCE_KEY =
119      "hbase.master.balancer.stochastic.minCostNeedBalance";
120
121  protected static final Random RANDOM = new Random(System.currentTimeMillis());
122  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
123
124  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
125
126  // values are defaults
127  private int maxSteps = 1000000;
128  private boolean runMaxSteps = false;
129  private int stepsPerRegion = 800;
130  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
131  private int numRegionLoadsToRemember = 15;
132  private float minCostNeedBalance = 0.05f;
133
134  private List<CandidateGenerator> candidateGenerators;
135  private CostFromRegionLoadFunction[] regionLoadFunctions;
136  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
137
138  // to save and report costs to JMX
139  private Double curOverallCost = 0d;
140  private Double[] tempFunctionCosts;
141  private Double[] curFunctionCosts;
142
143  // Keep locality based picker and cost function to alert them
144  // when new services are offered
145  private LocalityBasedCandidateGenerator localityCandidateGenerator;
146  private ServerLocalityCostFunction localityCost;
147  private RackLocalityCostFunction rackLocalityCost;
148  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
149  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
150  private boolean isByTable = false;
151  private TableName tableName = null;
152
153  /**
154   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
155   * default MetricsBalancer
156   */
157  public StochasticLoadBalancer() {
158    super(new MetricsStochasticBalancer());
159  }
160
161  @Override
162  public void onConfigurationChange(Configuration conf) {
163    setConf(conf);
164  }
165
166  @Override
167  public synchronized void setConf(Configuration conf) {
168    super.setConf(conf);
169    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
170    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
171    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
172    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
173
174    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
175    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
176    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
177    if (localityCandidateGenerator == null) {
178      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
179    }
180    localityCost = new ServerLocalityCostFunction(conf, services);
181    rackLocalityCost = new RackLocalityCostFunction(conf, services);
182
183    if (this.candidateGenerators == null) {
184      candidateGenerators = Lists.newArrayList();
185      candidateGenerators.add(new RandomCandidateGenerator());
186      candidateGenerators.add(new LoadCandidateGenerator());
187      candidateGenerators.add(localityCandidateGenerator);
188      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
189    }
190    regionLoadFunctions = new CostFromRegionLoadFunction[] {
191      new ReadRequestCostFunction(conf),
192      new CPRequestCostFunction(conf),
193      new WriteRequestCostFunction(conf),
194      new MemStoreSizeCostFunction(conf),
195      new StoreFileCostFunction(conf)
196    };
197    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
198    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
199    costFunctions = new CostFunction[]{
200      new RegionCountSkewCostFunction(conf),
201      new PrimaryRegionCountSkewCostFunction(conf),
202      new MoveCostFunction(conf),
203      localityCost,
204      rackLocalityCost,
205      new TableSkewCostFunction(conf),
206      regionReplicaHostCostFunction,
207      regionReplicaRackCostFunction,
208      regionLoadFunctions[0],
209      regionLoadFunctions[1],
210      regionLoadFunctions[2],
211      regionLoadFunctions[3],
212      regionLoadFunctions[4]
213    };
214    curFunctionCosts= new Double[costFunctions.length];
215    tempFunctionCosts= new Double[costFunctions.length];
216    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
217        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
218  }
219
220  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
221    this.candidateGenerators = customCandidateGenerators;
222  }
223
224  @Override
225  protected void setSlop(Configuration conf) {
226    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
227  }
228
229  @Override
230  public synchronized void setClusterMetrics(ClusterMetrics st) {
231    super.setClusterMetrics(st);
232    updateRegionLoad();
233    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
234      cost.setClusterMetrics(st);
235    }
236
237    // update metrics size
238    try {
239      // by-table or ensemble mode
240      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
241      int functionsCount = getCostFunctionNames().length;
242
243      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
244    } catch (Exception e) {
245      LOG.error("failed to get the size of all tables", e);
246    }
247  }
248
249  /**
250   * Update the number of metrics that are reported to JMX
251   */
252  public void updateMetricsSize(int size) {
253    if (metricsBalancer instanceof MetricsStochasticBalancer) {
254        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
255    }
256  }
257
258  @Override
259  public synchronized void setMasterServices(MasterServices masterServices) {
260    super.setMasterServices(masterServices);
261    this.localityCost.setServices(masterServices);
262    this.rackLocalityCost.setServices(masterServices);
263    this.localityCandidateGenerator.setServices(masterServices);
264  }
265
266  @Override
267  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
268    regionReplicaHostCostFunction.init(c);
269    if (regionReplicaHostCostFunction.cost() > 0) return true;
270    regionReplicaRackCostFunction.init(c);
271    if (regionReplicaRackCostFunction.cost() > 0) return true;
272    return false;
273  }
274
275  @Override
276  protected boolean needsBalance(Cluster cluster) {
277    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
278    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
279      if (LOG.isDebugEnabled()) {
280        LOG.debug("Not running balancer because only " + cs.getNumServers()
281            + " active regionserver(s)");
282      }
283      return false;
284    }
285    if (areSomeRegionReplicasColocated(cluster)) {
286      return true;
287    }
288
289    double total = 0.0;
290    float sumMultiplier = 0.0f;
291    for (CostFunction c : costFunctions) {
292      float multiplier = c.getMultiplier();
293      if (multiplier <= 0) {
294        continue;
295      }
296      if (!c.isNeeded()) {
297        LOG.debug("{} not needed", c.getClass().getSimpleName());
298        continue;
299      }
300      sumMultiplier += multiplier;
301      total += c.cost() * multiplier;
302    }
303
304    if (total <= 0 || sumMultiplier <= 0
305        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
306      if (LOG.isTraceEnabled()) {
307        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
308          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
309          + minCostNeedBalance);
310      }
311      return false;
312    }
313    return true;
314  }
315
316  @Override
317  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
318    List<RegionInfo>> clusterState) {
319    this.tableName = tableName;
320    return balanceCluster(clusterState);
321  }
322
323  @VisibleForTesting
324  Cluster.Action nextAction(Cluster cluster) {
325    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
326            .generate(cluster);
327  }
328
329  /**
330   * Given the cluster state this will try and approach an optimal balance. This
331   * should always approach the optimal state given enough steps.
332   */
333  @Override
334  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
335    List<RegionInfo>> clusterState) {
336    List<RegionPlan> plans = balanceMasterRegions(clusterState);
337    if (plans != null || clusterState == null || clusterState.size() <= 1) {
338      return plans;
339    }
340
341    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
342      if (clusterState.size() <= 2) {
343        return null;
344      }
345      clusterState = new HashMap<>(clusterState);
346      clusterState.remove(masterServerName);
347    }
348
349    // On clusters with lots of HFileLinks or lots of reference files,
350    // instantiating the storefile infos can be quite expensive.
351    // Allow turning this feature off if the locality cost is not going to
352    // be used in any computations.
353    RegionLocationFinder finder = null;
354    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
355        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
356      finder = this.regionFinder;
357    }
358
359    //The clusterState that is given to this method contains the state
360    //of all the regions in the table(s) (that's true today)
361    // Keep track of servers to iterate through them.
362    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
363
364    long startTime = EnvironmentEdgeManager.currentTime();
365
366    initCosts(cluster);
367
368    if (!needsBalance(cluster)) {
369      return null;
370    }
371
372    double currentCost = computeCost(cluster, Double.MAX_VALUE);
373    curOverallCost = currentCost;
374    for (int i = 0; i < this.curFunctionCosts.length; i++) {
375      curFunctionCosts[i] = tempFunctionCosts[i];
376    }
377    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
378        + functionCost());
379
380    double initCost = currentCost;
381    double newCost = currentCost;
382
383    long computedMaxSteps;
384    if (runMaxSteps) {
385      computedMaxSteps = Math.max(this.maxSteps,
386          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
387    } else {
388      computedMaxSteps = Math.min(this.maxSteps,
389          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
390    }
391    // Perform a stochastic walk to see if we can get a good fit.
392    long step;
393
394    for (step = 0; step < computedMaxSteps; step++) {
395      Cluster.Action action = nextAction(cluster);
396
397      if (action.type == Type.NULL) {
398        continue;
399      }
400
401      cluster.doAction(action);
402      updateCostsWithAction(cluster, action);
403
404      newCost = computeCost(cluster, currentCost);
405
406      // Should this be kept?
407      if (newCost < currentCost) {
408        currentCost = newCost;
409
410        // save for JMX
411        curOverallCost = currentCost;
412        for (int i = 0; i < this.curFunctionCosts.length; i++) {
413          curFunctionCosts[i] = tempFunctionCosts[i];
414        }
415      } else {
416        // Put things back the way they were before.
417        // TODO: undo by remembering old values
418        Action undoAction = action.undoAction();
419        cluster.doAction(undoAction);
420        updateCostsWithAction(cluster, undoAction);
421      }
422
423      if (EnvironmentEdgeManager.currentTime() - startTime >
424          maxRunningTime) {
425        break;
426      }
427    }
428    long endTime = EnvironmentEdgeManager.currentTime();
429
430    metricsBalancer.balanceCluster(endTime - startTime);
431
432    // update costs metrics
433    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
434    if (initCost > currentCost) {
435      plans = createRegionPlans(cluster);
436      if (LOG.isDebugEnabled()) {
437        LOG.debug("Finished computing new load balance plan.  Computation took "
438            + (endTime - startTime) + "ms to try " + step
439            + " different iterations.  Found a solution that moves "
440            + plans.size() + " regions; Going from a computed cost of "
441            + initCost + " to a new cost of " + currentCost);
442      }
443
444      return plans;
445    }
446    if (LOG.isDebugEnabled()) {
447      LOG.debug("Could not find a better load balance plan.  Tried "
448          + step + " different configurations in " + (endTime - startTime)
449          + "ms, and did not find anything with a computed cost less than " + initCost);
450    }
451    return null;
452  }
453
454  /**
455   * update costs to JMX
456   */
457  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
458    if (tableName == null) return;
459
460    // check if the metricsBalancer is MetricsStochasticBalancer before casting
461    if (metricsBalancer instanceof MetricsStochasticBalancer) {
462      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
463      // overall cost
464      balancer.updateStochasticCost(tableName.getNameAsString(),
465        "Overall", "Overall cost", overall);
466
467      // each cost function
468      for (int i = 0; i < costFunctions.length; i++) {
469        CostFunction costFunction = costFunctions[i];
470        String costFunctionName = costFunction.getClass().getSimpleName();
471        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
472        // TODO: cost function may need a specific description
473        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
474          "The percent of " + costFunctionName, costPercent);
475      }
476    }
477  }
478
479  private String functionCost() {
480    StringBuilder builder = new StringBuilder();
481    for (CostFunction c:costFunctions) {
482      builder.append(c.getClass().getSimpleName());
483      builder.append(" : (");
484      builder.append(c.getMultiplier());
485      builder.append(", ");
486      builder.append(c.cost());
487      builder.append("); ");
488    }
489    return builder.toString();
490  }
491
492  /**
493   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
494   * state.
495   *
496   * @param cluster The state of the cluster
497   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
498   */
499  private List<RegionPlan> createRegionPlans(Cluster cluster) {
500    List<RegionPlan> plans = new LinkedList<>();
501    for (int regionIndex = 0;
502         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
503      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
504      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
505
506      if (initialServerIndex != newServerIndex) {
507        RegionInfo region = cluster.regions[regionIndex];
508        ServerName initialServer = cluster.servers[initialServerIndex];
509        ServerName newServer = cluster.servers[newServerIndex];
510
511        if (LOG.isTraceEnabled()) {
512          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
513              + initialServer.getHostname() + " to " + newServer.getHostname());
514        }
515        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
516        plans.add(rp);
517      }
518    }
519    return plans;
520  }
521
522  /**
523   * Store the current region loads.
524   */
525  private synchronized void updateRegionLoad() {
526    // We create a new hashmap so that regions that are no longer there are removed.
527    // However we temporarily need the old loads so we can use them to keep the rolling average.
528    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
529    loads = new HashMap<>();
530
531    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
532      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
533        Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(regionName));
534        if (rLoads == null) {
535          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
536        } else if (rLoads.size() >= numRegionLoadsToRemember) {
537          rLoads.remove();
538        }
539        rLoads.add(new BalancerRegionLoad(rm));
540        loads.put(Bytes.toString(regionName), rLoads);
541      });
542    });
543
544    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
545      cost.setLoads(loads);
546    }
547  }
548
549  protected void initCosts(Cluster cluster) {
550    for (CostFunction c:costFunctions) {
551      c.init(cluster);
552    }
553  }
554
555  protected void updateCostsWithAction(Cluster cluster, Action action) {
556    for (CostFunction c : costFunctions) {
557      c.postAction(action);
558    }
559  }
560
561  /**
562   * Get the names of the cost functions
563   */
564  public String[] getCostFunctionNames() {
565    if (costFunctions == null) return null;
566    String[] ret = new String[costFunctions.length];
567    for (int i = 0; i < costFunctions.length; i++) {
568      CostFunction c = costFunctions[i];
569      ret[i] = c.getClass().getSimpleName();
570    }
571
572    return ret;
573  }
574
575  /**
576   * This is the main cost function.  It will compute a cost associated with a proposed cluster
577   * state.  All different costs will be combined with their multipliers to produce a double cost.
578   *
579   * @param cluster The state of the cluster
580   * @param previousCost the previous cost. This is used as an early out.
581   * @return a double of a cost associated with the proposed cluster state.  This cost is an
582   *         aggregate of all individual cost functions.
583   */
584  protected double computeCost(Cluster cluster, double previousCost) {
585    double total = 0;
586
587    for (int i = 0; i < costFunctions.length; i++) {
588      CostFunction c = costFunctions[i];
589      this.tempFunctionCosts[i] = 0.0;
590
591      if (c.getMultiplier() <= 0) {
592        continue;
593      }
594
595      Float multiplier = c.getMultiplier();
596      Double cost = c.cost();
597
598      this.tempFunctionCosts[i] = multiplier*cost;
599      total += this.tempFunctionCosts[i];
600
601      if (total > previousCost) {
602        break;
603      }
604    }
605
606    return total;
607  }
608
609  /** Generates a candidate action to be applied to the cluster for cost function search */
610  abstract static class CandidateGenerator {
611    abstract Cluster.Action generate(Cluster cluster);
612
613    /**
614     * From a list of regions pick a random one. Null can be returned which
615     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
616     * rather than swap.
617     *
618     * @param cluster        The state of the cluster
619     * @param server         index of the server
620     * @param chanceOfNoSwap Chance that this will decide to try a move rather
621     *                       than a swap.
622     * @return a random {@link RegionInfo} or null if an asymmetrical move is
623     *         suggested.
624     */
625    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
626      // Check to see if this is just a move.
627      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
628        // signal a move only.
629        return -1;
630      }
631      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
632      return cluster.regionsPerServer[server][rand];
633
634    }
635    protected int pickRandomServer(Cluster cluster) {
636      if (cluster.numServers < 1) {
637        return -1;
638      }
639
640      return RANDOM.nextInt(cluster.numServers);
641    }
642
643    protected int pickRandomRack(Cluster cluster) {
644      if (cluster.numRacks < 1) {
645        return -1;
646      }
647
648      return RANDOM.nextInt(cluster.numRacks);
649    }
650
651    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
652      if (cluster.numServers < 2) {
653        return -1;
654      }
655      while (true) {
656        int otherServerIndex = pickRandomServer(cluster);
657        if (otherServerIndex != serverIndex) {
658          return otherServerIndex;
659        }
660      }
661    }
662
663    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
664      if (cluster.numRacks < 2) {
665        return -1;
666      }
667      while (true) {
668        int otherRackIndex = pickRandomRack(cluster);
669        if (otherRackIndex != rackIndex) {
670          return otherRackIndex;
671        }
672      }
673    }
674
675    protected Cluster.Action pickRandomRegions(Cluster cluster,
676                                                       int thisServer,
677                                                       int otherServer) {
678      if (thisServer < 0 || otherServer < 0) {
679        return Cluster.NullAction;
680      }
681
682      // Decide who is most likely to need another region
683      int thisRegionCount = cluster.getNumRegions(thisServer);
684      int otherRegionCount = cluster.getNumRegions(otherServer);
685
686      // Assign the chance based upon the above
687      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
688      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
689
690      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
691      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
692
693      return getAction(thisServer, thisRegion, otherServer, otherRegion);
694    }
695
696    protected Cluster.Action getAction(int fromServer, int fromRegion,
697        int toServer, int toRegion) {
698      if (fromServer < 0 || toServer < 0) {
699        return Cluster.NullAction;
700      }
701      if (fromRegion > 0 && toRegion > 0) {
702        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
703          toServer, toRegion);
704      } else if (fromRegion > 0) {
705        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
706      } else if (toRegion > 0) {
707        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
708      } else {
709        return Cluster.NullAction;
710      }
711    }
712
713    /**
714     * Returns a random iteration order of indexes of an array with size length
715     */
716    protected List<Integer> getRandomIterationOrder(int length) {
717      ArrayList<Integer> order = new ArrayList<>(length);
718      for (int i = 0; i < length; i++) {
719        order.add(i);
720      }
721      Collections.shuffle(order);
722      return order;
723    }
724  }
725
726  static class RandomCandidateGenerator extends CandidateGenerator {
727
728    @Override
729    Cluster.Action generate(Cluster cluster) {
730
731      int thisServer = pickRandomServer(cluster);
732
733      // Pick the other server
734      int otherServer = pickOtherRandomServer(cluster, thisServer);
735
736      return pickRandomRegions(cluster, thisServer, otherServer);
737    }
738  }
739
740  static class LoadCandidateGenerator extends CandidateGenerator {
741
742    @Override
743    Cluster.Action generate(Cluster cluster) {
744      cluster.sortServersByRegionCount();
745      int thisServer = pickMostLoadedServer(cluster, -1);
746      int otherServer = pickLeastLoadedServer(cluster, thisServer);
747
748      return pickRandomRegions(cluster, thisServer, otherServer);
749    }
750
751    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
752      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
753
754      int index = 0;
755      while (servers[index] == null || servers[index] == thisServer) {
756        index++;
757        if (index == servers.length) {
758          return -1;
759        }
760      }
761      return servers[index];
762    }
763
764    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
765      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
766
767      int index = servers.length - 1;
768      while (servers[index] == null || servers[index] == thisServer) {
769        index--;
770        if (index < 0) {
771          return -1;
772        }
773      }
774      return servers[index];
775    }
776  }
777
778  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
779
780    private MasterServices masterServices;
781
782    LocalityBasedCandidateGenerator(MasterServices masterServices) {
783      this.masterServices = masterServices;
784    }
785
786    @Override
787    Cluster.Action generate(Cluster cluster) {
788      if (this.masterServices == null) {
789        int thisServer = pickRandomServer(cluster);
790        // Pick the other server
791        int otherServer = pickOtherRandomServer(cluster, thisServer);
792        return pickRandomRegions(cluster, thisServer, otherServer);
793      }
794
795      // Randomly iterate through regions until you find one that is not on ideal host
796      for (int region : getRandomIterationOrder(cluster.numRegions)) {
797        int currentServer = cluster.regionIndexToServerIndex[region];
798        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
799          Optional<Action> potential = tryMoveOrSwap(
800              cluster,
801              currentServer,
802              region,
803              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
804          );
805          if (potential.isPresent()) {
806            return potential.get();
807          }
808        }
809      }
810      return Cluster.NullAction;
811    }
812
813    /**
814     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
815     * Returns empty optional if no move can be found
816     */
817    private Optional<Action> tryMoveOrSwap(Cluster cluster,
818                                           int fromServer,
819                                           int fromRegion,
820                                           int toServer) {
821      // Try move first. We know apriori fromRegion has the highest locality on toServer
822      if (cluster.serverHasTooFewRegions(toServer)) {
823        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
824      }
825
826      // Compare locality gain/loss from swapping fromRegion with regions on toServer
827      double fromRegionLocalityDelta =
828          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
829      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
830        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
831        double toRegionLocalityDelta =
832            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
833        // If locality would remain neutral or improve, attempt the swap
834        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
835          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
836        }
837      }
838
839      return Optional.absent();
840    }
841
842    private double getWeightedLocality(Cluster cluster, int region, int server) {
843      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
844    }
845
846    void setServices(MasterServices services) {
847      this.masterServices = services;
848    }
849  }
850
851  /**
852   * Generates candidates which moves the replicas out of the region server for
853   * co-hosted region replicas
854   */
855  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
856
857    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
858
859    /**
860     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
861     * (a group is a server, host or rack)
862     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
863     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
864     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
865     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
866     * @return a regionIndex for the selected primary or -1 if there is no co-locating
867     */
868    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
869        , int[] regionIndexToPrimaryIndex) {
870      int currentPrimary = -1;
871      int currentPrimaryIndex = -1;
872      int selectedPrimaryIndex = -1;
873      double currentLargestRandom = -1;
874      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
875      // ids for the regions hosted in server, a consecutive repetition means that replicas
876      // are co-hosted
877      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
878        int primary = j < primariesOfRegionsPerGroup.length
879            ? primariesOfRegionsPerGroup[j] : -1;
880        if (primary != currentPrimary) { // check for whether we see a new primary
881          int numReplicas = j - currentPrimaryIndex;
882          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
883            // decide to select this primary region id or not
884            double currentRandom = RANDOM.nextDouble();
885            // we don't know how many region replicas are co-hosted, we will randomly select one
886            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
887            if (currentRandom > currentLargestRandom) {
888              selectedPrimaryIndex = currentPrimary;
889              currentLargestRandom = currentRandom;
890            }
891          }
892          currentPrimary = primary;
893          currentPrimaryIndex = j;
894        }
895      }
896
897      // we have found the primary id for the region to move. Now find the actual regionIndex
898      // with the given primary, prefer to move the secondary region.
899      for (int j = 0; j < regionsPerGroup.length; j++) {
900        int regionIndex = regionsPerGroup[j];
901        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
902          // always move the secondary, not the primary
903          if (selectedPrimaryIndex != regionIndex) {
904            return regionIndex;
905          }
906        }
907      }
908      return -1;
909    }
910
911    @Override
912    Cluster.Action generate(Cluster cluster) {
913      int serverIndex = pickRandomServer(cluster);
914      if (cluster.numServers <= 1 || serverIndex == -1) {
915        return Cluster.NullAction;
916      }
917
918      int regionIndex = selectCoHostedRegionPerGroup(
919        cluster.primariesOfRegionsPerServer[serverIndex],
920        cluster.regionsPerServer[serverIndex],
921        cluster.regionIndexToPrimaryIndex);
922
923      // if there are no pairs of region replicas co-hosted, default to random generator
924      if (regionIndex == -1) {
925        // default to randompicker
926        return randomGenerator.generate(cluster);
927      }
928
929      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
930      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
931      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
932    }
933  }
934
935  /**
936   * Generates candidates which moves the replicas out of the rack for
937   * co-hosted region replicas in the same rack
938   */
939  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
940    @Override
941    Cluster.Action generate(Cluster cluster) {
942      int rackIndex = pickRandomRack(cluster);
943      if (cluster.numRacks <= 1 || rackIndex == -1) {
944        return super.generate(cluster);
945      }
946
947      int regionIndex = selectCoHostedRegionPerGroup(
948        cluster.primariesOfRegionsPerRack[rackIndex],
949        cluster.regionsPerRack[rackIndex],
950        cluster.regionIndexToPrimaryIndex);
951
952      // if there are no pairs of region replicas co-hosted, default to random generator
953      if (regionIndex == -1) {
954        // default to randompicker
955        return randomGenerator.generate(cluster);
956      }
957
958      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
959      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
960
961      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
962      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
963      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
964      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
965    }
966  }
967
968  /**
969   * Base class of StochasticLoadBalancer's Cost Functions.
970   */
971  abstract static class CostFunction {
972
973    private float multiplier = 0;
974
975    protected Cluster cluster;
976
977    CostFunction(Configuration c) {
978    }
979
980    boolean isNeeded() {
981      return true;
982    }
983    float getMultiplier() {
984      return multiplier;
985    }
986
987    void setMultiplier(float m) {
988      this.multiplier = m;
989    }
990
991    /** Called once per LB invocation to give the cost function
992     * to initialize it's state, and perform any costly calculation.
993     */
994    void init(Cluster cluster) {
995      this.cluster = cluster;
996    }
997
998    /** Called once per cluster Action to give the cost function
999     * an opportunity to update it's state. postAction() is always
1000     * called at least once before cost() is called with the cluster
1001     * that this action is performed on. */
1002    void postAction(Action action) {
1003      switch (action.type) {
1004      case NULL: break;
1005      case ASSIGN_REGION:
1006        AssignRegionAction ar = (AssignRegionAction) action;
1007        regionMoved(ar.region, -1, ar.server);
1008        break;
1009      case MOVE_REGION:
1010        MoveRegionAction mra = (MoveRegionAction) action;
1011        regionMoved(mra.region, mra.fromServer, mra.toServer);
1012        break;
1013      case SWAP_REGIONS:
1014        SwapRegionsAction a = (SwapRegionsAction) action;
1015        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1016        regionMoved(a.toRegion, a.toServer, a.fromServer);
1017        break;
1018      default:
1019        throw new RuntimeException("Uknown action:" + action.type);
1020      }
1021    }
1022
1023    protected void regionMoved(int region, int oldServer, int newServer) {
1024    }
1025
1026    abstract double cost();
1027
1028    /**
1029     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1030     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1031     * possible is all of the elements in one region server and the rest having 0.
1032     *
1033     * @param stats the costs
1034     * @return a scaled set of costs.
1035     */
1036    protected double costFromArray(double[] stats) {
1037      double totalCost = 0;
1038      double total = getSum(stats);
1039
1040      double count = stats.length;
1041      double mean = total/count;
1042
1043      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1044      // a zero sum cost for this to make sense.
1045      double max = ((count - 1) * mean) + (total - mean);
1046
1047      // It's possible that there aren't enough regions to go around
1048      double min;
1049      if (count > total) {
1050        min = ((count - total) * mean) + ((1 - mean) * total);
1051      } else {
1052        // Some will have 1 more than everything else.
1053        int numHigh = (int) (total - (Math.floor(mean) * count));
1054        int numLow = (int) (count - numHigh);
1055
1056        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1057
1058      }
1059      min = Math.max(0, min);
1060      for (int i=0; i<stats.length; i++) {
1061        double n = stats[i];
1062        double diff = Math.abs(mean - n);
1063        totalCost += diff;
1064      }
1065
1066      double scaled =  scale(min, max, totalCost);
1067      return scaled;
1068    }
1069
1070    private double getSum(double[] stats) {
1071      double total = 0;
1072      for(double s:stats) {
1073        total += s;
1074      }
1075      return total;
1076    }
1077
1078    /**
1079     * Scale the value between 0 and 1.
1080     *
1081     * @param min   Min value
1082     * @param max   The Max value
1083     * @param value The value to be scaled.
1084     * @return The scaled value.
1085     */
1086    protected double scale(double min, double max, double value) {
1087      if (max <= min || value <= min) {
1088        return 0;
1089      }
1090      if ((max - min) == 0) return 0;
1091
1092      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1093    }
1094  }
1095
1096  /**
1097   * Given the starting state of the regions and a potential ending state
1098   * compute cost based upon the number of regions that have moved.
1099   */
1100  static class MoveCostFunction extends CostFunction {
1101    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1102    private static final String MAX_MOVES_PERCENT_KEY =
1103        "hbase.master.balancer.stochastic.maxMovePercent";
1104    private static final float DEFAULT_MOVE_COST = 7;
1105    private static final int DEFAULT_MAX_MOVES = 600;
1106    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1107
1108    private final float maxMovesPercent;
1109
1110    MoveCostFunction(Configuration conf) {
1111      super(conf);
1112
1113      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1114      // that large benefits are need to overcome the cost of a move.
1115      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1116      // What percent of the number of regions a single run of the balancer can move.
1117      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1118    }
1119
1120    @Override
1121    double cost() {
1122      // Try and size the max number of Moves, but always be prepared to move some.
1123      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1124          DEFAULT_MAX_MOVES);
1125
1126      double moveCost = cluster.numMovedRegions;
1127
1128      // Don't let this single balance move more than the max moves.
1129      // This allows better scaling to accurately represent the actual cost of a move.
1130      if (moveCost > maxMoves) {
1131        return 1000000;   // return a number much greater than any of the other cost
1132      }
1133
1134      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1135    }
1136  }
1137
1138  /**
1139   * Compute the cost of a potential cluster state from skew in number of
1140   * regions on a cluster.
1141   */
1142  static class RegionCountSkewCostFunction extends CostFunction {
1143    private static final String REGION_COUNT_SKEW_COST_KEY =
1144        "hbase.master.balancer.stochastic.regionCountCost";
1145    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1146
1147    private double[] stats = null;
1148
1149    RegionCountSkewCostFunction(Configuration conf) {
1150      super(conf);
1151      // Load multiplier should be the greatest as it is the most general way to balance data.
1152      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1153    }
1154
1155    @Override
1156    double cost() {
1157      if (stats == null || stats.length != cluster.numServers) {
1158        stats = new double[cluster.numServers];
1159      }
1160
1161      for (int i =0; i < cluster.numServers; i++) {
1162        stats[i] = cluster.regionsPerServer[i].length;
1163      }
1164
1165      return costFromArray(stats);
1166    }
1167  }
1168
1169  /**
1170   * Compute the cost of a potential cluster state from skew in number of
1171   * primary regions on a cluster.
1172   */
1173  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1174    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1175        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1176    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1177
1178    private double[] stats = null;
1179
1180    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1181      super(conf);
1182      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1183      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1184        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1185    }
1186
1187    @Override
1188    double cost() {
1189      if (!cluster.hasRegionReplicas) {
1190        return 0;
1191      }
1192      if (stats == null || stats.length != cluster.numServers) {
1193        stats = new double[cluster.numServers];
1194      }
1195
1196      for (int i = 0; i < cluster.numServers; i++) {
1197        stats[i] = 0;
1198        for (int regionIdx : cluster.regionsPerServer[i]) {
1199          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1200            stats[i]++;
1201          }
1202        }
1203      }
1204
1205      return costFromArray(stats);
1206    }
1207  }
1208
1209  /**
1210   * Compute the cost of a potential cluster configuration based upon how evenly
1211   * distributed tables are.
1212   */
1213  static class TableSkewCostFunction extends CostFunction {
1214
1215    private static final String TABLE_SKEW_COST_KEY =
1216        "hbase.master.balancer.stochastic.tableSkewCost";
1217    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1218
1219    TableSkewCostFunction(Configuration conf) {
1220      super(conf);
1221      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1222    }
1223
1224    @Override
1225    double cost() {
1226      double max = cluster.numRegions;
1227      double min = ((double) cluster.numRegions) / cluster.numServers;
1228      double value = 0;
1229
1230      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1231        value += cluster.numMaxRegionsPerTable[i];
1232      }
1233
1234      return scale(min, max, value);
1235    }
1236  }
1237
1238  /**
1239   * Compute a cost of a potential cluster configuration based upon where
1240   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1241   */
1242  static abstract class LocalityBasedCostFunction extends CostFunction {
1243
1244    private final LocalityType type;
1245
1246    private double bestLocality; // best case locality across cluster weighted by local data size
1247    private double locality; // current locality across cluster weighted by local data size
1248
1249    private MasterServices services;
1250
1251    LocalityBasedCostFunction(Configuration conf,
1252                              MasterServices srv,
1253                              LocalityType type,
1254                              String localityCostKey,
1255                              float defaultLocalityCost) {
1256      super(conf);
1257      this.type = type;
1258      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1259      this.services = srv;
1260      this.locality = 0.0;
1261      this.bestLocality = 0.0;
1262    }
1263
1264    /**
1265     * Maps region to the current entity (server or rack) on which it is stored
1266     */
1267    abstract int regionIndexToEntityIndex(int region);
1268
1269    public void setServices(MasterServices srvc) {
1270      this.services = srvc;
1271    }
1272
1273    @Override
1274    void init(Cluster cluster) {
1275      super.init(cluster);
1276      locality = 0.0;
1277      bestLocality = 0.0;
1278
1279      // If no master, no computation will work, so assume 0 cost
1280      if (this.services == null) {
1281        return;
1282      }
1283
1284      for (int region = 0; region < cluster.numRegions; region++) {
1285        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1286        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1287      }
1288
1289      // We normalize locality to be a score between 0 and 1.0 representing how good it
1290      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1291      // (and the cost is 0)
1292      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1293    }
1294
1295    @Override
1296    protected void regionMoved(int region, int oldServer, int newServer) {
1297      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1298      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1299      if (this.services == null) {
1300        return;
1301      }
1302      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1303      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1304      locality += normalizedDelta;
1305    }
1306
1307    @Override
1308    double cost() {
1309      return 1 - locality;
1310    }
1311
1312    private int getMostLocalEntityForRegion(int region) {
1313      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1314    }
1315
1316    private double getWeightedLocality(int region, int entity) {
1317      return cluster.getOrComputeWeightedLocality(region, entity, type);
1318    }
1319
1320  }
1321
1322  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1323
1324    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1325    private static final float DEFAULT_LOCALITY_COST = 25;
1326
1327    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1328      super(
1329          conf,
1330          srv,
1331          LocalityType.SERVER,
1332          LOCALITY_COST_KEY,
1333          DEFAULT_LOCALITY_COST
1334      );
1335    }
1336
1337    @Override
1338    int regionIndexToEntityIndex(int region) {
1339      return cluster.regionIndexToServerIndex[region];
1340    }
1341  }
1342
1343  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1344
1345    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1346    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1347
1348    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1349      super(
1350          conf,
1351          services,
1352          LocalityType.RACK,
1353          RACK_LOCALITY_COST_KEY,
1354          DEFAULT_RACK_LOCALITY_COST
1355      );
1356    }
1357
1358    @Override
1359    int regionIndexToEntityIndex(int region) {
1360      return cluster.getRackForRegion(region);
1361    }
1362  }
1363
1364  /**
1365   * Base class the allows writing costs functions from rolling average of some
1366   * number from RegionLoad.
1367   */
1368  abstract static class CostFromRegionLoadFunction extends CostFunction {
1369
1370    private ClusterMetrics clusterStatus = null;
1371    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1372    private double[] stats = null;
1373    CostFromRegionLoadFunction(Configuration conf) {
1374      super(conf);
1375    }
1376
1377    void setClusterMetrics(ClusterMetrics status) {
1378      this.clusterStatus = status;
1379    }
1380
1381    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1382      this.loads = l;
1383    }
1384
1385    @Override
1386    double cost() {
1387      if (clusterStatus == null || loads == null) {
1388        return 0;
1389      }
1390
1391      if (stats == null || stats.length != cluster.numServers) {
1392        stats = new double[cluster.numServers];
1393      }
1394
1395      for (int i =0; i < stats.length; i++) {
1396        //Cost this server has from RegionLoad
1397        long cost = 0;
1398
1399        // for every region on this server get the rl
1400        for(int regionIndex:cluster.regionsPerServer[i]) {
1401          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1402
1403          // Now if we found a region load get the type of cost that was requested.
1404          if (regionLoadList != null) {
1405            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1406          }
1407        }
1408
1409        // Add the total cost to the stats.
1410        stats[i] = cost;
1411      }
1412
1413      // Now return the scaled cost from data held in the stats object.
1414      return costFromArray(stats);
1415    }
1416
1417    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1418      double cost = 0;
1419      for (BalancerRegionLoad rl : regionLoadList) {
1420        cost += getCostFromRl(rl);
1421      }
1422      return cost / regionLoadList.size();
1423    }
1424
1425    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1426  }
1427
1428  /**
1429   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1430   * We do not compare about the actual rate in requests per second but rather the rate relative
1431   * to the rest of the regions.
1432   */
1433  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1434
1435    CostFromRegionLoadAsRateFunction(Configuration conf) {
1436      super(conf);
1437    }
1438
1439    @Override
1440    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1441      double cost = 0;
1442      double previous = 0;
1443      boolean isFirst = true;
1444      for (BalancerRegionLoad rl : regionLoadList) {
1445        double current = getCostFromRl(rl);
1446        if (isFirst) {
1447          isFirst = false;
1448        } else {
1449          cost += current - previous;
1450        }
1451        previous = current;
1452      }
1453      return Math.max(0, cost / (regionLoadList.size() - 1));
1454    }
1455  }
1456
1457  /**
1458   * Compute the cost of total number of read requests  The more unbalanced the higher the
1459   * computed cost will be.  This uses a rolling average of regionload.
1460   */
1461
1462  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1463
1464    private static final String READ_REQUEST_COST_KEY =
1465        "hbase.master.balancer.stochastic.readRequestCost";
1466    private static final float DEFAULT_READ_REQUEST_COST = 5;
1467
1468    ReadRequestCostFunction(Configuration conf) {
1469      super(conf);
1470      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1471    }
1472
1473    @Override
1474    protected double getCostFromRl(BalancerRegionLoad rl) {
1475      return rl.getReadRequestsCount();
1476    }
1477  }
1478
1479  /**
1480   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1481   * computed cost will be.  This uses a rolling average of regionload.
1482   */
1483
1484  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1485
1486    private static final String CP_REQUEST_COST_KEY =
1487        "hbase.master.balancer.stochastic.cpRequestCost";
1488    private static final float DEFAULT_CP_REQUEST_COST = 5;
1489
1490    CPRequestCostFunction(Configuration conf) {
1491      super(conf);
1492      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1493    }
1494
1495    @Override
1496    protected double getCostFromRl(BalancerRegionLoad rl) {
1497      return rl.getCpRequestsCount();
1498    }
1499  }
1500
1501  /**
1502   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1503   * computed cost will be.  This uses a rolling average of regionload.
1504   */
1505  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1506
1507    private static final String WRITE_REQUEST_COST_KEY =
1508        "hbase.master.balancer.stochastic.writeRequestCost";
1509    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1510
1511    WriteRequestCostFunction(Configuration conf) {
1512      super(conf);
1513      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1514    }
1515
1516    @Override
1517    protected double getCostFromRl(BalancerRegionLoad rl) {
1518      return rl.getWriteRequestsCount();
1519    }
1520  }
1521
1522  /**
1523   * A cost function for region replicas. We give a very high cost to hosting
1524   * replicas of the same region in the same host. We do not prevent the case
1525   * though, since if numReplicas > numRegionServers, we still want to keep the
1526   * replica open.
1527   */
1528  static class RegionReplicaHostCostFunction extends CostFunction {
1529    private static final String REGION_REPLICA_HOST_COST_KEY =
1530        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1531    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1532
1533    long maxCost = 0;
1534    long[] costsPerGroup; // group is either server, host or rack
1535    int[][] primariesOfRegionsPerGroup;
1536
1537    public RegionReplicaHostCostFunction(Configuration conf) {
1538      super(conf);
1539      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1540        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1541    }
1542
1543    @Override
1544    void init(Cluster cluster) {
1545      super.init(cluster);
1546      // max cost is the case where every region replica is hosted together regardless of host
1547      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1548      costsPerGroup = new long[cluster.numHosts];
1549      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1550          ? cluster.primariesOfRegionsPerHost
1551          : cluster.primariesOfRegionsPerServer;
1552      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1553        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1554      }
1555    }
1556
1557    long getMaxCost(Cluster cluster) {
1558      if (!cluster.hasRegionReplicas) {
1559        return 0; // short circuit
1560      }
1561      // max cost is the case where every region replica is hosted together regardless of host
1562      int[] primariesOfRegions = new int[cluster.numRegions];
1563      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1564          cluster.regions.length);
1565
1566      Arrays.sort(primariesOfRegions);
1567
1568      // compute numReplicas from the sorted array
1569      return costPerGroup(primariesOfRegions);
1570    }
1571
1572    @Override
1573    boolean isNeeded() {
1574      return cluster.hasRegionReplicas;
1575    }
1576
1577    @Override
1578    double cost() {
1579      if (maxCost <= 0) {
1580        return 0;
1581      }
1582
1583      long totalCost = 0;
1584      for (int i = 0 ; i < costsPerGroup.length; i++) {
1585        totalCost += costsPerGroup[i];
1586      }
1587      return scale(0, maxCost, totalCost);
1588    }
1589
1590    /**
1591     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1592     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1593     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1594     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1595     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1596     * @return a sum of numReplicas-1 squared for each primary region in the group.
1597     */
1598    protected long costPerGroup(int[] primariesOfRegions) {
1599      long cost = 0;
1600      int currentPrimary = -1;
1601      int currentPrimaryIndex = -1;
1602      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1603      // sharing the same primary will have consecutive numbers in the array.
1604      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1605        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1606        if (primary != currentPrimary) { // we see a new primary
1607          int numReplicas = j - currentPrimaryIndex;
1608          // square the cost
1609          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1610            cost += (numReplicas - 1) * (numReplicas - 1);
1611          }
1612          currentPrimary = primary;
1613          currentPrimaryIndex = j;
1614        }
1615      }
1616
1617      return cost;
1618    }
1619
1620    @Override
1621    protected void regionMoved(int region, int oldServer, int newServer) {
1622      if (maxCost <= 0) {
1623        return; // no need to compute
1624      }
1625      if (cluster.multiServersPerHost) {
1626        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1627        int newHost = cluster.serverIndexToHostIndex[newServer];
1628        if (newHost != oldHost) {
1629          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1630          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1631        }
1632      } else {
1633        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1634        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1635      }
1636    }
1637  }
1638
1639  /**
1640   * A cost function for region replicas for the rack distribution. We give a relatively high
1641   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1642   * though.
1643   */
1644  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1645    private static final String REGION_REPLICA_RACK_COST_KEY =
1646        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1647    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1648
1649    public RegionReplicaRackCostFunction(Configuration conf) {
1650      super(conf);
1651      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1652        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1653    }
1654
1655    @Override
1656    void init(Cluster cluster) {
1657      this.cluster = cluster;
1658      if (cluster.numRacks <= 1) {
1659        maxCost = 0;
1660        return; // disabled for 1 rack
1661      }
1662      // max cost is the case where every region replica is hosted together regardless of rack
1663      maxCost = getMaxCost(cluster);
1664      costsPerGroup = new long[cluster.numRacks];
1665      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1666        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1667      }
1668    }
1669
1670    @Override
1671    protected void regionMoved(int region, int oldServer, int newServer) {
1672      if (maxCost <= 0) {
1673        return; // no need to compute
1674      }
1675      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1676      int newRack = cluster.serverIndexToRackIndex[newServer];
1677      if (newRack != oldRack) {
1678        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1679        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1680      }
1681    }
1682  }
1683
1684  /**
1685   * Compute the cost of total memstore size.  The more unbalanced the higher the
1686   * computed cost will be.  This uses a rolling average of regionload.
1687   */
1688  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1689
1690    private static final String MEMSTORE_SIZE_COST_KEY =
1691        "hbase.master.balancer.stochastic.memstoreSizeCost";
1692    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1693
1694    MemStoreSizeCostFunction(Configuration conf) {
1695      super(conf);
1696      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1697    }
1698
1699    @Override
1700    protected double getCostFromRl(BalancerRegionLoad rl) {
1701      return rl.getMemStoreSizeMB();
1702    }
1703  }
1704
1705  /**
1706   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1707   * computed cost will be.  This uses a rolling average of regionload.
1708   */
1709  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1710
1711    private static final String STOREFILE_SIZE_COST_KEY =
1712        "hbase.master.balancer.stochastic.storefileSizeCost";
1713    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1714
1715    StoreFileCostFunction(Configuration conf) {
1716      super(conf);
1717      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1718    }
1719
1720    @Override
1721    protected double getCostFromRl(BalancerRegionLoad rl) {
1722      return rl.getStorefileSizeMB();
1723    }
1724  }
1725
1726  /**
1727   * A helper function to compose the attribute name from tablename and costfunction name
1728   */
1729  public static String composeAttributeName(String tableName, String costFunctionName) {
1730    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1731  }
1732}