001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Random;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ClusterStatus;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.RegionLoad;
040import org.apache.hadoop.hbase.ServerLoad;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.RegionPlan;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
049import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
050import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
051import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.yetus.audience.InterfaceAudience;
055
056import org.apache.hadoop.hbase.shaded.com.google.common.base.Optional;
057import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
058
059import com.google.common.annotations.VisibleForTesting;
060
061/**
062 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
063 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
064 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
065 * <ul>
066 * <li>Region Load</li>
067 * <li>Table Load</li>
068 * <li>Data Locality</li>
069 * <li>Memstore Sizes</li>
070 * <li>Storefile Sizes</li>
071 * </ul>
072 *
073 *
074 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
075 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
076 * scaled by their respective multipliers:</p>
077 *
078 * <ul>
079 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.moveCost</li>
081 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
082 *   <li>hbase.master.balancer.stochastic.localityCost</li>
083 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
084 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
085 * </ul>
086 *
087 * <p>In addition to the above configurations, the balancer can be tuned by the following
088 * configuration values:</p>
089 * <ul>
090 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
091 *   controls what the max number of regions that can be moved in a single invocation of this
092 *   balancer.</li>
093 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
094 *   regions is multiplied to try and get the number of times the balancer will
095 *   mutate all servers.</li>
096 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
097 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
098 *   value and the above computation.</li>
099 * </ul>
100 *
101 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
102 * so that the balancer gets the full picture of all loads on the cluster.</p>
103 */
104@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
105@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
106  justification="Complaint is about costFunctions not being synchronized; not end of the world")
107public class StochasticLoadBalancer extends BaseLoadBalancer {
108
109  protected static final String STEPS_PER_REGION_KEY =
110      "hbase.master.balancer.stochastic.stepsPerRegion";
111  protected static final String MAX_STEPS_KEY =
112      "hbase.master.balancer.stochastic.maxSteps";
113  protected static final String RUN_MAX_STEPS_KEY =
114      "hbase.master.balancer.stochastic.runMaxSteps";
115  protected static final String MAX_RUNNING_TIME_KEY =
116      "hbase.master.balancer.stochastic.maxRunningTime";
117  protected static final String KEEP_REGION_LOADS =
118      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
119  private static final String TABLE_FUNCTION_SEP = "_";
120  protected static final String MIN_COST_NEED_BALANCE_KEY =
121      "hbase.master.balancer.stochastic.minCostNeedBalance";
122
123  protected static final Random RANDOM = new Random(System.currentTimeMillis());
124  private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
125
126  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
127
128  // values are defaults
129  private int maxSteps = 1000000;
130  private boolean runMaxSteps = false;
131  private int stepsPerRegion = 800;
132  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
133  private int numRegionLoadsToRemember = 15;
134  private float minCostNeedBalance = 0.05f;
135
136  private List<CandidateGenerator> candidateGenerators;
137  private CostFromRegionLoadFunction[] regionLoadFunctions;
138  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
139
140  // to save and report costs to JMX
141  private Double curOverallCost = 0d;
142  private Double[] tempFunctionCosts;
143  private Double[] curFunctionCosts;
144
145  // Keep locality based picker and cost function to alert them
146  // when new services are offered
147  private LocalityBasedCandidateGenerator localityCandidateGenerator;
148  private ServerLocalityCostFunction localityCost;
149  private RackLocalityCostFunction rackLocalityCost;
150  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
151  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
152  private boolean isByTable = false;
153  private TableName tableName = null;
154
155  /**
156   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
157   * default MetricsBalancer
158   */
159  public StochasticLoadBalancer() {
160    super(new MetricsStochasticBalancer());
161  }
162
163  @Override
164  public void onConfigurationChange(Configuration conf) {
165    setConf(conf);
166  }
167
168  @Override
169  public synchronized void setConf(Configuration conf) {
170    super.setConf(conf);
171    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
172    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
173    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
174    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
175
176    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
177    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
178    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
179    if (localityCandidateGenerator == null) {
180      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
181    }
182    localityCost = new ServerLocalityCostFunction(conf, services);
183    rackLocalityCost = new RackLocalityCostFunction(conf, services);
184
185    if (this.candidateGenerators == null) {
186      candidateGenerators = Lists.newArrayList();
187      candidateGenerators.add(new RandomCandidateGenerator());
188      candidateGenerators.add(new LoadCandidateGenerator());
189      candidateGenerators.add(localityCandidateGenerator);
190      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
191    }
192    regionLoadFunctions = new CostFromRegionLoadFunction[] {
193      new ReadRequestCostFunction(conf),
194      new WriteRequestCostFunction(conf),
195      new MemStoreSizeCostFunction(conf),
196      new StoreFileCostFunction(conf)
197    };
198    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
199    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
200    costFunctions = new CostFunction[]{
201      new RegionCountSkewCostFunction(conf),
202      new PrimaryRegionCountSkewCostFunction(conf),
203      new MoveCostFunction(conf),
204      localityCost,
205      rackLocalityCost,
206      new TableSkewCostFunction(conf),
207      regionReplicaHostCostFunction,
208      regionReplicaRackCostFunction,
209      regionLoadFunctions[0],
210      regionLoadFunctions[1],
211      regionLoadFunctions[2],
212      regionLoadFunctions[3],
213    };
214    curFunctionCosts= new Double[costFunctions.length];
215    tempFunctionCosts= new Double[costFunctions.length];
216    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
217        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
218  }
219
220  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
221    this.candidateGenerators = customCandidateGenerators;
222  }
223
224  @Override
225  protected void setSlop(Configuration conf) {
226    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
227  }
228
229  @Override
230  public synchronized void setClusterStatus(ClusterStatus st) {
231    super.setClusterStatus(st);
232    updateRegionLoad();
233    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
234      cost.setClusterStatus(st);
235    }
236
237    // update metrics size
238    try {
239      // by-table or ensemble mode
240      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
241      int functionsCount = getCostFunctionNames().length;
242
243      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
244    } catch (Exception e) {
245      LOG.error("failed to get the size of all tables", e);
246    }
247  }
248
249  /**
250   * Update the number of metrics that are reported to JMX
251   */
252  public void updateMetricsSize(int size) {
253    if (metricsBalancer instanceof MetricsStochasticBalancer) {
254        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
255    }
256  }
257
258  @Override
259  public synchronized void setMasterServices(MasterServices masterServices) {
260    super.setMasterServices(masterServices);
261    this.localityCost.setServices(masterServices);
262    this.rackLocalityCost.setServices(masterServices);
263    this.localityCandidateGenerator.setServices(masterServices);
264  }
265
266  @Override
267  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
268    regionReplicaHostCostFunction.init(c);
269    if (regionReplicaHostCostFunction.cost() > 0) return true;
270    regionReplicaRackCostFunction.init(c);
271    if (regionReplicaRackCostFunction.cost() > 0) return true;
272    return false;
273  }
274
275  @Override
276  protected boolean needsBalance(Cluster cluster) {
277    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
278    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
279      if (LOG.isDebugEnabled()) {
280        LOG.debug("Not running balancer because only " + cs.getNumServers()
281            + " active regionserver(s)");
282      }
283      return false;
284    }
285    if (areSomeRegionReplicasColocated(cluster)) {
286      return true;
287    }
288
289    double total = 0.0;
290    float sumMultiplier = 0.0f;
291    for (CostFunction c : costFunctions) {
292      float multiplier = c.getMultiplier();
293      if (multiplier <= 0) {
294        continue;
295      }
296      if (!c.isNeeded()) {
297        LOG.debug(c.getClass().getName() + " indicated that its cost should not be considered");
298        continue;
299      }
300      sumMultiplier += multiplier;
301      total += c.cost() * multiplier;
302    }
303
304    if (total <= 0 || sumMultiplier <= 0
305        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
306      if (LOG.isTraceEnabled()) {
307        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
308          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
309          + minCostNeedBalance);
310      }
311      return false;
312    }
313    return true;
314  }
315
316  @Override
317  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
318    List<RegionInfo>> clusterState) {
319    this.tableName = tableName;
320    return balanceCluster(clusterState);
321  }
322
323  @VisibleForTesting
324  Cluster.Action nextAction(Cluster cluster) {
325    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
326            .generate(cluster);
327  }
328
329  /**
330   * Given the cluster state this will try and approach an optimal balance. This
331   * should always approach the optimal state given enough steps.
332   */
333  @Override
334  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
335    List<RegionInfo>> clusterState) {
336    List<RegionPlan> plans = balanceMasterRegions(clusterState);
337    if (plans != null || clusterState == null || clusterState.size() <= 1) {
338      return plans;
339    }
340
341    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
342      if (clusterState.size() <= 2) {
343        return null;
344      }
345      clusterState = new HashMap<>(clusterState);
346      clusterState.remove(masterServerName);
347    }
348
349    // On clusters with lots of HFileLinks or lots of reference files,
350    // instantiating the storefile infos can be quite expensive.
351    // Allow turning this feature off if the locality cost is not going to
352    // be used in any computations.
353    RegionLocationFinder finder = null;
354    if (this.localityCost != null && this.localityCost.getMultiplier() > 0
355        || this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) {
356      finder = this.regionFinder;
357    }
358
359    //The clusterState that is given to this method contains the state
360    //of all the regions in the table(s) (that's true today)
361    // Keep track of servers to iterate through them.
362    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
363
364    long startTime = EnvironmentEdgeManager.currentTime();
365
366    initCosts(cluster);
367
368    if (!needsBalance(cluster)) {
369      return null;
370    }
371
372    double currentCost = computeCost(cluster, Double.MAX_VALUE);
373    curOverallCost = currentCost;
374    for (int i = 0; i < this.curFunctionCosts.length; i++) {
375      curFunctionCosts[i] = tempFunctionCosts[i];
376    }
377    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
378        + functionCost());
379
380    double initCost = currentCost;
381    double newCost = currentCost;
382
383    long computedMaxSteps;
384    if (runMaxSteps) {
385      computedMaxSteps = Math.max(this.maxSteps,
386          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
387    } else {
388      computedMaxSteps = Math.min(this.maxSteps,
389          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
390    }
391    // Perform a stochastic walk to see if we can get a good fit.
392    long step;
393
394    for (step = 0; step < computedMaxSteps; step++) {
395      Cluster.Action action = nextAction(cluster);
396
397      if (action.type == Type.NULL) {
398        continue;
399      }
400
401      cluster.doAction(action);
402      updateCostsWithAction(cluster, action);
403
404      newCost = computeCost(cluster, currentCost);
405
406      // Should this be kept?
407      if (newCost < currentCost) {
408        currentCost = newCost;
409
410        // save for JMX
411        curOverallCost = currentCost;
412        for (int i = 0; i < this.curFunctionCosts.length; i++) {
413          curFunctionCosts[i] = tempFunctionCosts[i];
414        }
415      } else {
416        // Put things back the way they were before.
417        // TODO: undo by remembering old values
418        Action undoAction = action.undoAction();
419        cluster.doAction(undoAction);
420        updateCostsWithAction(cluster, undoAction);
421      }
422
423      if (EnvironmentEdgeManager.currentTime() - startTime >
424          maxRunningTime) {
425        break;
426      }
427    }
428    long endTime = EnvironmentEdgeManager.currentTime();
429
430    metricsBalancer.balanceCluster(endTime - startTime);
431
432    // update costs metrics
433    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
434    if (initCost > currentCost) {
435      plans = createRegionPlans(cluster);
436      if (LOG.isDebugEnabled()) {
437        LOG.debug("Finished computing new load balance plan.  Computation took "
438            + (endTime - startTime) + "ms to try " + step
439            + " different iterations.  Found a solution that moves "
440            + plans.size() + " regions; Going from a computed cost of "
441            + initCost + " to a new cost of " + currentCost);
442      }
443
444      return plans;
445    }
446    if (LOG.isDebugEnabled()) {
447      LOG.debug("Could not find a better load balance plan.  Tried "
448          + step + " different configurations in " + (endTime - startTime)
449          + "ms, and did not find anything with a computed cost less than " + initCost);
450    }
451    return null;
452  }
453
454  /**
455   * update costs to JMX
456   */
457  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
458    if (tableName == null) return;
459
460    // check if the metricsBalancer is MetricsStochasticBalancer before casting
461    if (metricsBalancer instanceof MetricsStochasticBalancer) {
462      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
463      // overall cost
464      balancer.updateStochasticCost(tableName.getNameAsString(),
465        "Overall", "Overall cost", overall);
466
467      // each cost function
468      for (int i = 0; i < costFunctions.length; i++) {
469        CostFunction costFunction = costFunctions[i];
470        String costFunctionName = costFunction.getClass().getSimpleName();
471        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
472        // TODO: cost function may need a specific description
473        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
474          "The percent of " + costFunctionName, costPercent);
475      }
476    }
477  }
478
479  private String functionCost() {
480    StringBuilder builder = new StringBuilder();
481    for (CostFunction c:costFunctions) {
482      builder.append(c.getClass().getSimpleName());
483      builder.append(" : (");
484      builder.append(c.getMultiplier());
485      builder.append(", ");
486      builder.append(c.cost());
487      builder.append("); ");
488    }
489    return builder.toString();
490  }
491
492  /**
493   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
494   * state.
495   *
496   * @param cluster The state of the cluster
497   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
498   */
499  private List<RegionPlan> createRegionPlans(Cluster cluster) {
500    List<RegionPlan> plans = new LinkedList<>();
501    for (int regionIndex = 0;
502         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
503      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
504      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
505
506      if (initialServerIndex != newServerIndex) {
507        RegionInfo region = cluster.regions[regionIndex];
508        ServerName initialServer = cluster.servers[initialServerIndex];
509        ServerName newServer = cluster.servers[newServerIndex];
510
511        if (LOG.isTraceEnabled()) {
512          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
513              + initialServer.getHostname() + " to " + newServer.getHostname());
514        }
515        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
516        plans.add(rp);
517      }
518    }
519    return plans;
520  }
521
522  /**
523   * Store the current region loads.
524   */
525  private synchronized void updateRegionLoad() {
526    // We create a new hashmap so that regions that are no longer there are removed.
527    // However we temporarily need the old loads so we can use them to keep the rolling average.
528    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
529    loads = new HashMap<>();
530
531    for (ServerName sn : clusterStatus.getServers()) {
532      ServerLoad sl = clusterStatus.getLoad(sn);
533      if (sl == null) {
534        continue;
535      }
536      for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
537        Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
538        if (rLoads == null) {
539          // There was nothing there
540          rLoads = new ArrayDeque<>();
541        } else if (rLoads.size() >= numRegionLoadsToRemember) {
542          rLoads.remove();
543        }
544        rLoads.add(new BalancerRegionLoad(entry.getValue()));
545        loads.put(Bytes.toString(entry.getKey()), rLoads);
546      }
547    }
548
549    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
550      cost.setLoads(loads);
551    }
552  }
553
554  protected void initCosts(Cluster cluster) {
555    for (CostFunction c:costFunctions) {
556      c.init(cluster);
557    }
558  }
559
560  protected void updateCostsWithAction(Cluster cluster, Action action) {
561    for (CostFunction c : costFunctions) {
562      c.postAction(action);
563    }
564  }
565
566  /**
567   * Get the names of the cost functions
568   */
569  public String[] getCostFunctionNames() {
570    if (costFunctions == null) return null;
571    String[] ret = new String[costFunctions.length];
572    for (int i = 0; i < costFunctions.length; i++) {
573      CostFunction c = costFunctions[i];
574      ret[i] = c.getClass().getSimpleName();
575    }
576
577    return ret;
578  }
579
580  /**
581   * This is the main cost function.  It will compute a cost associated with a proposed cluster
582   * state.  All different costs will be combined with their multipliers to produce a double cost.
583   *
584   * @param cluster The state of the cluster
585   * @param previousCost the previous cost. This is used as an early out.
586   * @return a double of a cost associated with the proposed cluster state.  This cost is an
587   *         aggregate of all individual cost functions.
588   */
589  protected double computeCost(Cluster cluster, double previousCost) {
590    double total = 0;
591
592    for (int i = 0; i < costFunctions.length; i++) {
593      CostFunction c = costFunctions[i];
594      this.tempFunctionCosts[i] = 0.0;
595
596      if (c.getMultiplier() <= 0) {
597        continue;
598      }
599
600      Float multiplier = c.getMultiplier();
601      Double cost = c.cost();
602
603      this.tempFunctionCosts[i] = multiplier*cost;
604      total += this.tempFunctionCosts[i];
605
606      if (total > previousCost) {
607        break;
608      }
609    }
610
611    return total;
612  }
613
614  /** Generates a candidate action to be applied to the cluster for cost function search */
615  abstract static class CandidateGenerator {
616    abstract Cluster.Action generate(Cluster cluster);
617
618    /**
619     * From a list of regions pick a random one. Null can be returned which
620     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
621     * rather than swap.
622     *
623     * @param cluster        The state of the cluster
624     * @param server         index of the server
625     * @param chanceOfNoSwap Chance that this will decide to try a move rather
626     *                       than a swap.
627     * @return a random {@link RegionInfo} or null if an asymmetrical move is
628     *         suggested.
629     */
630    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
631      // Check to see if this is just a move.
632      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
633        // signal a move only.
634        return -1;
635      }
636      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
637      return cluster.regionsPerServer[server][rand];
638
639    }
640    protected int pickRandomServer(Cluster cluster) {
641      if (cluster.numServers < 1) {
642        return -1;
643      }
644
645      return RANDOM.nextInt(cluster.numServers);
646    }
647
648    protected int pickRandomRack(Cluster cluster) {
649      if (cluster.numRacks < 1) {
650        return -1;
651      }
652
653      return RANDOM.nextInt(cluster.numRacks);
654    }
655
656    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
657      if (cluster.numServers < 2) {
658        return -1;
659      }
660      while (true) {
661        int otherServerIndex = pickRandomServer(cluster);
662        if (otherServerIndex != serverIndex) {
663          return otherServerIndex;
664        }
665      }
666    }
667
668    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
669      if (cluster.numRacks < 2) {
670        return -1;
671      }
672      while (true) {
673        int otherRackIndex = pickRandomRack(cluster);
674        if (otherRackIndex != rackIndex) {
675          return otherRackIndex;
676        }
677      }
678    }
679
680    protected Cluster.Action pickRandomRegions(Cluster cluster,
681                                                       int thisServer,
682                                                       int otherServer) {
683      if (thisServer < 0 || otherServer < 0) {
684        return Cluster.NullAction;
685      }
686
687      // Decide who is most likely to need another region
688      int thisRegionCount = cluster.getNumRegions(thisServer);
689      int otherRegionCount = cluster.getNumRegions(otherServer);
690
691      // Assign the chance based upon the above
692      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
693      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
694
695      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
696      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
697
698      return getAction(thisServer, thisRegion, otherServer, otherRegion);
699    }
700
701    protected Cluster.Action getAction(int fromServer, int fromRegion,
702        int toServer, int toRegion) {
703      if (fromServer < 0 || toServer < 0) {
704        return Cluster.NullAction;
705      }
706      if (fromRegion > 0 && toRegion > 0) {
707        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
708          toServer, toRegion);
709      } else if (fromRegion > 0) {
710        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
711      } else if (toRegion > 0) {
712        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
713      } else {
714        return Cluster.NullAction;
715      }
716    }
717
718    /**
719     * Returns a random iteration order of indexes of an array with size length
720     */
721    protected List<Integer> getRandomIterationOrder(int length) {
722      ArrayList<Integer> order = new ArrayList<>(length);
723      for (int i = 0; i < length; i++) {
724        order.add(i);
725      }
726      Collections.shuffle(order);
727      return order;
728    }
729  }
730
731  static class RandomCandidateGenerator extends CandidateGenerator {
732
733    @Override
734    Cluster.Action generate(Cluster cluster) {
735
736      int thisServer = pickRandomServer(cluster);
737
738      // Pick the other server
739      int otherServer = pickOtherRandomServer(cluster, thisServer);
740
741      return pickRandomRegions(cluster, thisServer, otherServer);
742    }
743  }
744
745  static class LoadCandidateGenerator extends CandidateGenerator {
746
747    @Override
748    Cluster.Action generate(Cluster cluster) {
749      cluster.sortServersByRegionCount();
750      int thisServer = pickMostLoadedServer(cluster, -1);
751      int otherServer = pickLeastLoadedServer(cluster, thisServer);
752
753      return pickRandomRegions(cluster, thisServer, otherServer);
754    }
755
756    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
757      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
758
759      int index = 0;
760      while (servers[index] == null || servers[index] == thisServer) {
761        index++;
762        if (index == servers.length) {
763          return -1;
764        }
765      }
766      return servers[index];
767    }
768
769    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
770      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
771
772      int index = servers.length - 1;
773      while (servers[index] == null || servers[index] == thisServer) {
774        index--;
775        if (index < 0) {
776          return -1;
777        }
778      }
779      return servers[index];
780    }
781  }
782
783  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
784
785    private MasterServices masterServices;
786
787    LocalityBasedCandidateGenerator(MasterServices masterServices) {
788      this.masterServices = masterServices;
789    }
790
791    @Override
792    Cluster.Action generate(Cluster cluster) {
793      if (this.masterServices == null) {
794        int thisServer = pickRandomServer(cluster);
795        // Pick the other server
796        int otherServer = pickOtherRandomServer(cluster, thisServer);
797        return pickRandomRegions(cluster, thisServer, otherServer);
798      }
799
800      // Randomly iterate through regions until you find one that is not on ideal host
801      for (int region : getRandomIterationOrder(cluster.numRegions)) {
802        int currentServer = cluster.regionIndexToServerIndex[region];
803        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
804          Optional<Action> potential = tryMoveOrSwap(
805              cluster,
806              currentServer,
807              region,
808              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
809          );
810          if (potential.isPresent()) {
811            return potential.get();
812          }
813        }
814      }
815      return Cluster.NullAction;
816    }
817
818    /**
819     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
820     * Returns empty optional if no move can be found
821     */
822    private Optional<Action> tryMoveOrSwap(Cluster cluster,
823                                           int fromServer,
824                                           int fromRegion,
825                                           int toServer) {
826      // Try move first. We know apriori fromRegion has the highest locality on toServer
827      if (cluster.serverHasTooFewRegions(toServer)) {
828        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
829      }
830
831      // Compare locality gain/loss from swapping fromRegion with regions on toServer
832      double fromRegionLocalityDelta =
833          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
834      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
835        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
836        double toRegionLocalityDelta =
837            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
838        // If locality would remain neutral or improve, attempt the swap
839        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
840          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
841        }
842      }
843
844      return Optional.absent();
845    }
846
847    private double getWeightedLocality(Cluster cluster, int region, int server) {
848      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
849    }
850
851    void setServices(MasterServices services) {
852      this.masterServices = services;
853    }
854  }
855
856  /**
857   * Generates candidates which moves the replicas out of the region server for
858   * co-hosted region replicas
859   */
860  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
861
862    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
863
864    /**
865     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
866     * (a group is a server, host or rack)
867     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
868     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
869     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
870     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
871     * @return a regionIndex for the selected primary or -1 if there is no co-locating
872     */
873    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
874        , int[] regionIndexToPrimaryIndex) {
875      int currentPrimary = -1;
876      int currentPrimaryIndex = -1;
877      int selectedPrimaryIndex = -1;
878      double currentLargestRandom = -1;
879      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
880      // ids for the regions hosted in server, a consecutive repetition means that replicas
881      // are co-hosted
882      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
883        int primary = j < primariesOfRegionsPerGroup.length
884            ? primariesOfRegionsPerGroup[j] : -1;
885        if (primary != currentPrimary) { // check for whether we see a new primary
886          int numReplicas = j - currentPrimaryIndex;
887          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
888            // decide to select this primary region id or not
889            double currentRandom = RANDOM.nextDouble();
890            // we don't know how many region replicas are co-hosted, we will randomly select one
891            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
892            if (currentRandom > currentLargestRandom) {
893              selectedPrimaryIndex = currentPrimary;
894              currentLargestRandom = currentRandom;
895            }
896          }
897          currentPrimary = primary;
898          currentPrimaryIndex = j;
899        }
900      }
901
902      // we have found the primary id for the region to move. Now find the actual regionIndex
903      // with the given primary, prefer to move the secondary region.
904      for (int j = 0; j < regionsPerGroup.length; j++) {
905        int regionIndex = regionsPerGroup[j];
906        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
907          // always move the secondary, not the primary
908          if (selectedPrimaryIndex != regionIndex) {
909            return regionIndex;
910          }
911        }
912      }
913      return -1;
914    }
915
916    @Override
917    Cluster.Action generate(Cluster cluster) {
918      int serverIndex = pickRandomServer(cluster);
919      if (cluster.numServers <= 1 || serverIndex == -1) {
920        return Cluster.NullAction;
921      }
922
923      int regionIndex = selectCoHostedRegionPerGroup(
924        cluster.primariesOfRegionsPerServer[serverIndex],
925        cluster.regionsPerServer[serverIndex],
926        cluster.regionIndexToPrimaryIndex);
927
928      // if there are no pairs of region replicas co-hosted, default to random generator
929      if (regionIndex == -1) {
930        // default to randompicker
931        return randomGenerator.generate(cluster);
932      }
933
934      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
935      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
936      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
937    }
938  }
939
940  /**
941   * Generates candidates which moves the replicas out of the rack for
942   * co-hosted region replicas in the same rack
943   */
944  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
945    @Override
946    Cluster.Action generate(Cluster cluster) {
947      int rackIndex = pickRandomRack(cluster);
948      if (cluster.numRacks <= 1 || rackIndex == -1) {
949        return super.generate(cluster);
950      }
951
952      int regionIndex = selectCoHostedRegionPerGroup(
953        cluster.primariesOfRegionsPerRack[rackIndex],
954        cluster.regionsPerRack[rackIndex],
955        cluster.regionIndexToPrimaryIndex);
956
957      // if there are no pairs of region replicas co-hosted, default to random generator
958      if (regionIndex == -1) {
959        // default to randompicker
960        return randomGenerator.generate(cluster);
961      }
962
963      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
964      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
965
966      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
967      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
968      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
969      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
970    }
971  }
972
973  /**
974   * Base class of StochasticLoadBalancer's Cost Functions.
975   */
976  abstract static class CostFunction {
977
978    private float multiplier = 0;
979
980    protected Cluster cluster;
981
982    CostFunction(Configuration c) {
983    }
984
985    boolean isNeeded() {
986      return true;
987    }
988    float getMultiplier() {
989      return multiplier;
990    }
991
992    void setMultiplier(float m) {
993      this.multiplier = m;
994    }
995
996    /** Called once per LB invocation to give the cost function
997     * to initialize it's state, and perform any costly calculation.
998     */
999    void init(Cluster cluster) {
1000      this.cluster = cluster;
1001    }
1002
1003    /** Called once per cluster Action to give the cost function
1004     * an opportunity to update it's state. postAction() is always
1005     * called at least once before cost() is called with the cluster
1006     * that this action is performed on. */
1007    void postAction(Action action) {
1008      switch (action.type) {
1009      case NULL: break;
1010      case ASSIGN_REGION:
1011        AssignRegionAction ar = (AssignRegionAction) action;
1012        regionMoved(ar.region, -1, ar.server);
1013        break;
1014      case MOVE_REGION:
1015        MoveRegionAction mra = (MoveRegionAction) action;
1016        regionMoved(mra.region, mra.fromServer, mra.toServer);
1017        break;
1018      case SWAP_REGIONS:
1019        SwapRegionsAction a = (SwapRegionsAction) action;
1020        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1021        regionMoved(a.toRegion, a.toServer, a.fromServer);
1022        break;
1023      default:
1024        throw new RuntimeException("Uknown action:" + action.type);
1025      }
1026    }
1027
1028    protected void regionMoved(int region, int oldServer, int newServer) {
1029    }
1030
1031    abstract double cost();
1032
1033    /**
1034     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1035     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1036     * possible is all of the elements in one region server and the rest having 0.
1037     *
1038     * @param stats the costs
1039     * @return a scaled set of costs.
1040     */
1041    protected double costFromArray(double[] stats) {
1042      double totalCost = 0;
1043      double total = getSum(stats);
1044
1045      double count = stats.length;
1046      double mean = total/count;
1047
1048      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1049      // a zero sum cost for this to make sense.
1050      double max = ((count - 1) * mean) + (total - mean);
1051
1052      // It's possible that there aren't enough regions to go around
1053      double min;
1054      if (count > total) {
1055        min = ((count - total) * mean) + ((1 - mean) * total);
1056      } else {
1057        // Some will have 1 more than everything else.
1058        int numHigh = (int) (total - (Math.floor(mean) * count));
1059        int numLow = (int) (count - numHigh);
1060
1061        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1062
1063      }
1064      min = Math.max(0, min);
1065      for (int i=0; i<stats.length; i++) {
1066        double n = stats[i];
1067        double diff = Math.abs(mean - n);
1068        totalCost += diff;
1069      }
1070
1071      double scaled =  scale(min, max, totalCost);
1072      return scaled;
1073    }
1074
1075    private double getSum(double[] stats) {
1076      double total = 0;
1077      for(double s:stats) {
1078        total += s;
1079      }
1080      return total;
1081    }
1082
1083    /**
1084     * Scale the value between 0 and 1.
1085     *
1086     * @param min   Min value
1087     * @param max   The Max value
1088     * @param value The value to be scaled.
1089     * @return The scaled value.
1090     */
1091    protected double scale(double min, double max, double value) {
1092      if (max <= min || value <= min) {
1093        return 0;
1094      }
1095      if ((max - min) == 0) return 0;
1096
1097      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1098    }
1099  }
1100
1101  /**
1102   * Given the starting state of the regions and a potential ending state
1103   * compute cost based upon the number of regions that have moved.
1104   */
1105  static class MoveCostFunction extends CostFunction {
1106    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1107    private static final String MAX_MOVES_PERCENT_KEY =
1108        "hbase.master.balancer.stochastic.maxMovePercent";
1109    private static final float DEFAULT_MOVE_COST = 7;
1110    private static final int DEFAULT_MAX_MOVES = 600;
1111    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1112
1113    private final float maxMovesPercent;
1114
1115    MoveCostFunction(Configuration conf) {
1116      super(conf);
1117
1118      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1119      // that large benefits are need to overcome the cost of a move.
1120      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1121      // What percent of the number of regions a single run of the balancer can move.
1122      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1123    }
1124
1125    @Override
1126    double cost() {
1127      // Try and size the max number of Moves, but always be prepared to move some.
1128      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1129          DEFAULT_MAX_MOVES);
1130
1131      double moveCost = cluster.numMovedRegions;
1132
1133      // Don't let this single balance move more than the max moves.
1134      // This allows better scaling to accurately represent the actual cost of a move.
1135      if (moveCost > maxMoves) {
1136        return 1000000;   // return a number much greater than any of the other cost
1137      }
1138
1139      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1140    }
1141  }
1142
1143  /**
1144   * Compute the cost of a potential cluster state from skew in number of
1145   * regions on a cluster.
1146   */
1147  static class RegionCountSkewCostFunction extends CostFunction {
1148    private static final String REGION_COUNT_SKEW_COST_KEY =
1149        "hbase.master.balancer.stochastic.regionCountCost";
1150    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1151
1152    private double[] stats = null;
1153
1154    RegionCountSkewCostFunction(Configuration conf) {
1155      super(conf);
1156      // Load multiplier should be the greatest as it is the most general way to balance data.
1157      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1158    }
1159
1160    @Override
1161    double cost() {
1162      if (stats == null || stats.length != cluster.numServers) {
1163        stats = new double[cluster.numServers];
1164      }
1165
1166      for (int i =0; i < cluster.numServers; i++) {
1167        stats[i] = cluster.regionsPerServer[i].length;
1168      }
1169
1170      return costFromArray(stats);
1171    }
1172  }
1173
1174  /**
1175   * Compute the cost of a potential cluster state from skew in number of
1176   * primary regions on a cluster.
1177   */
1178  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1179    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1180        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1181    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1182
1183    private double[] stats = null;
1184
1185    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1186      super(conf);
1187      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1188      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1189        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1190    }
1191
1192    @Override
1193    double cost() {
1194      if (!cluster.hasRegionReplicas) {
1195        return 0;
1196      }
1197      if (stats == null || stats.length != cluster.numServers) {
1198        stats = new double[cluster.numServers];
1199      }
1200
1201      for (int i = 0; i < cluster.numServers; i++) {
1202        stats[i] = 0;
1203        for (int regionIdx : cluster.regionsPerServer[i]) {
1204          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1205            stats[i]++;
1206          }
1207        }
1208      }
1209
1210      return costFromArray(stats);
1211    }
1212  }
1213
1214  /**
1215   * Compute the cost of a potential cluster configuration based upon how evenly
1216   * distributed tables are.
1217   */
1218  static class TableSkewCostFunction extends CostFunction {
1219
1220    private static final String TABLE_SKEW_COST_KEY =
1221        "hbase.master.balancer.stochastic.tableSkewCost";
1222    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1223
1224    TableSkewCostFunction(Configuration conf) {
1225      super(conf);
1226      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1227    }
1228
1229    @Override
1230    double cost() {
1231      double max = cluster.numRegions;
1232      double min = ((double) cluster.numRegions) / cluster.numServers;
1233      double value = 0;
1234
1235      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1236        value += cluster.numMaxRegionsPerTable[i];
1237      }
1238
1239      return scale(min, max, value);
1240    }
1241  }
1242
1243  /**
1244   * Compute a cost of a potential cluster configuration based upon where
1245   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1246   */
1247  static abstract class LocalityBasedCostFunction extends CostFunction {
1248
1249    private final LocalityType type;
1250
1251    private double bestLocality; // best case locality across cluster weighted by local data size
1252    private double locality; // current locality across cluster weighted by local data size
1253
1254    private MasterServices services;
1255
1256    LocalityBasedCostFunction(Configuration conf,
1257                              MasterServices srv,
1258                              LocalityType type,
1259                              String localityCostKey,
1260                              float defaultLocalityCost) {
1261      super(conf);
1262      this.type = type;
1263      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1264      this.services = srv;
1265      this.locality = 0.0;
1266      this.bestLocality = 0.0;
1267    }
1268
1269    /**
1270     * Maps region to the current entity (server or rack) on which it is stored
1271     */
1272    abstract int regionIndexToEntityIndex(int region);
1273
1274    public void setServices(MasterServices srvc) {
1275      this.services = srvc;
1276    }
1277
1278    @Override
1279    void init(Cluster cluster) {
1280      super.init(cluster);
1281      locality = 0.0;
1282      bestLocality = 0.0;
1283
1284      // If no master, no computation will work, so assume 0 cost
1285      if (this.services == null) {
1286        return;
1287      }
1288
1289      for (int region = 0; region < cluster.numRegions; region++) {
1290        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1291        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1292      }
1293
1294      // We normalize locality to be a score between 0 and 1.0 representing how good it
1295      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1296      // (and the cost is 0)
1297      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1298    }
1299
1300    @Override
1301    protected void regionMoved(int region, int oldServer, int newServer) {
1302      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1303      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1304      if (this.services == null) {
1305        return;
1306      }
1307      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1308      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1309      locality += normalizedDelta;
1310    }
1311
1312    @Override
1313    double cost() {
1314      return 1 - locality;
1315    }
1316
1317    private int getMostLocalEntityForRegion(int region) {
1318      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1319    }
1320
1321    private double getWeightedLocality(int region, int entity) {
1322      return cluster.getOrComputeWeightedLocality(region, entity, type);
1323    }
1324
1325  }
1326
1327  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1328
1329    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1330    private static final float DEFAULT_LOCALITY_COST = 25;
1331
1332    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1333      super(
1334          conf,
1335          srv,
1336          LocalityType.SERVER,
1337          LOCALITY_COST_KEY,
1338          DEFAULT_LOCALITY_COST
1339      );
1340    }
1341
1342    @Override
1343    int regionIndexToEntityIndex(int region) {
1344      return cluster.regionIndexToServerIndex[region];
1345    }
1346  }
1347
1348  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1349
1350    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1351    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1352
1353    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1354      super(
1355          conf,
1356          services,
1357          LocalityType.RACK,
1358          RACK_LOCALITY_COST_KEY,
1359          DEFAULT_RACK_LOCALITY_COST
1360      );
1361    }
1362
1363    @Override
1364    int regionIndexToEntityIndex(int region) {
1365      return cluster.getRackForRegion(region);
1366    }
1367  }
1368
1369  /**
1370   * Base class the allows writing costs functions from rolling average of some
1371   * number from RegionLoad.
1372   */
1373  abstract static class CostFromRegionLoadFunction extends CostFunction {
1374
1375    private ClusterStatus clusterStatus = null;
1376    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1377    private double[] stats = null;
1378    CostFromRegionLoadFunction(Configuration conf) {
1379      super(conf);
1380    }
1381
1382    void setClusterStatus(ClusterStatus status) {
1383      this.clusterStatus = status;
1384    }
1385
1386    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1387      this.loads = l;
1388    }
1389
1390    @Override
1391    double cost() {
1392      if (clusterStatus == null || loads == null) {
1393        return 0;
1394      }
1395
1396      if (stats == null || stats.length != cluster.numServers) {
1397        stats = new double[cluster.numServers];
1398      }
1399
1400      for (int i =0; i < stats.length; i++) {
1401        //Cost this server has from RegionLoad
1402        long cost = 0;
1403
1404        // for every region on this server get the rl
1405        for(int regionIndex:cluster.regionsPerServer[i]) {
1406          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1407
1408          // Now if we found a region load get the type of cost that was requested.
1409          if (regionLoadList != null) {
1410            cost += getRegionLoadCost(regionLoadList);
1411          }
1412        }
1413
1414        // Add the total cost to the stats.
1415        stats[i] = cost;
1416      }
1417
1418      // Now return the scaled cost from data held in the stats object.
1419      return costFromArray(stats);
1420    }
1421
1422    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1423      double cost = 0;
1424      for (BalancerRegionLoad rl : regionLoadList) {
1425        cost += getCostFromRl(rl);
1426      }
1427      return cost / regionLoadList.size();
1428    }
1429
1430    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1431  }
1432
1433  /**
1434   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1435   * We do not compare about the actual rate in requests per second but rather the rate relative
1436   * to the rest of the regions.
1437   */
1438  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1439
1440    CostFromRegionLoadAsRateFunction(Configuration conf) {
1441      super(conf);
1442    }
1443
1444    @Override
1445    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1446      double cost = 0;
1447      double previous = 0;
1448      boolean isFirst = true;
1449      for (BalancerRegionLoad rl : regionLoadList) {
1450        double current = getCostFromRl(rl);
1451        if (isFirst) {
1452          isFirst = false;
1453        } else {
1454          cost += current - previous;
1455        }
1456        previous = current;
1457      }
1458      return Math.max(0, cost / (regionLoadList.size() - 1));
1459    }
1460  }
1461
1462  /**
1463   * Compute the cost of total number of read requests  The more unbalanced the higher the
1464   * computed cost will be.  This uses a rolling average of regionload.
1465   */
1466
1467  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1468
1469    private static final String READ_REQUEST_COST_KEY =
1470        "hbase.master.balancer.stochastic.readRequestCost";
1471    private static final float DEFAULT_READ_REQUEST_COST = 5;
1472
1473    ReadRequestCostFunction(Configuration conf) {
1474      super(conf);
1475      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1476    }
1477
1478    @Override
1479    protected double getCostFromRl(BalancerRegionLoad rl) {
1480      return rl.getReadRequestsCount();
1481    }
1482  }
1483
1484  /**
1485   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1486   * computed cost will be.  This uses a rolling average of regionload.
1487   */
1488  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1489
1490    private static final String WRITE_REQUEST_COST_KEY =
1491        "hbase.master.balancer.stochastic.writeRequestCost";
1492    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1493
1494    WriteRequestCostFunction(Configuration conf) {
1495      super(conf);
1496      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1497    }
1498
1499    @Override
1500    protected double getCostFromRl(BalancerRegionLoad rl) {
1501      return rl.getWriteRequestsCount();
1502    }
1503  }
1504
1505  /**
1506   * A cost function for region replicas. We give a very high cost to hosting
1507   * replicas of the same region in the same host. We do not prevent the case
1508   * though, since if numReplicas > numRegionServers, we still want to keep the
1509   * replica open.
1510   */
1511  static class RegionReplicaHostCostFunction extends CostFunction {
1512    private static final String REGION_REPLICA_HOST_COST_KEY =
1513        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1514    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1515
1516    long maxCost = 0;
1517    long[] costsPerGroup; // group is either server, host or rack
1518    int[][] primariesOfRegionsPerGroup;
1519
1520    public RegionReplicaHostCostFunction(Configuration conf) {
1521      super(conf);
1522      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1523        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1524    }
1525
1526    @Override
1527    void init(Cluster cluster) {
1528      super.init(cluster);
1529      // max cost is the case where every region replica is hosted together regardless of host
1530      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1531      costsPerGroup = new long[cluster.numHosts];
1532      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1533          ? cluster.primariesOfRegionsPerHost
1534          : cluster.primariesOfRegionsPerServer;
1535      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1536        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1537      }
1538    }
1539
1540    long getMaxCost(Cluster cluster) {
1541      if (!cluster.hasRegionReplicas) {
1542        return 0; // short circuit
1543      }
1544      // max cost is the case where every region replica is hosted together regardless of host
1545      int[] primariesOfRegions = new int[cluster.numRegions];
1546      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1547          cluster.regions.length);
1548
1549      Arrays.sort(primariesOfRegions);
1550
1551      // compute numReplicas from the sorted array
1552      return costPerGroup(primariesOfRegions);
1553    }
1554
1555    @Override
1556    boolean isNeeded() {
1557      return cluster.hasRegionReplicas;
1558    }
1559
1560    @Override
1561    double cost() {
1562      if (maxCost <= 0) {
1563        return 0;
1564      }
1565
1566      long totalCost = 0;
1567      for (int i = 0 ; i < costsPerGroup.length; i++) {
1568        totalCost += costsPerGroup[i];
1569      }
1570      return scale(0, maxCost, totalCost);
1571    }
1572
1573    /**
1574     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1575     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1576     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1577     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1578     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1579     * @return a sum of numReplicas-1 squared for each primary region in the group.
1580     */
1581    protected long costPerGroup(int[] primariesOfRegions) {
1582      long cost = 0;
1583      int currentPrimary = -1;
1584      int currentPrimaryIndex = -1;
1585      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1586      // sharing the same primary will have consecutive numbers in the array.
1587      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1588        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1589        if (primary != currentPrimary) { // we see a new primary
1590          int numReplicas = j - currentPrimaryIndex;
1591          // square the cost
1592          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1593            cost += (numReplicas - 1) * (numReplicas - 1);
1594          }
1595          currentPrimary = primary;
1596          currentPrimaryIndex = j;
1597        }
1598      }
1599
1600      return cost;
1601    }
1602
1603    @Override
1604    protected void regionMoved(int region, int oldServer, int newServer) {
1605      if (maxCost <= 0) {
1606        return; // no need to compute
1607      }
1608      if (cluster.multiServersPerHost) {
1609        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1610        int newHost = cluster.serverIndexToHostIndex[newServer];
1611        if (newHost != oldHost) {
1612          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1613          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1614        }
1615      } else {
1616        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1617        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1618      }
1619    }
1620  }
1621
1622  /**
1623   * A cost function for region replicas for the rack distribution. We give a relatively high
1624   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1625   * though.
1626   */
1627  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1628    private static final String REGION_REPLICA_RACK_COST_KEY =
1629        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1630    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1631
1632    public RegionReplicaRackCostFunction(Configuration conf) {
1633      super(conf);
1634      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1635        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1636    }
1637
1638    @Override
1639    void init(Cluster cluster) {
1640      this.cluster = cluster;
1641      if (cluster.numRacks <= 1) {
1642        maxCost = 0;
1643        return; // disabled for 1 rack
1644      }
1645      // max cost is the case where every region replica is hosted together regardless of rack
1646      maxCost = getMaxCost(cluster);
1647      costsPerGroup = new long[cluster.numRacks];
1648      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1649        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1650      }
1651    }
1652
1653    @Override
1654    protected void regionMoved(int region, int oldServer, int newServer) {
1655      if (maxCost <= 0) {
1656        return; // no need to compute
1657      }
1658      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1659      int newRack = cluster.serverIndexToRackIndex[newServer];
1660      if (newRack != oldRack) {
1661        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1662        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1663      }
1664    }
1665  }
1666
1667  /**
1668   * Compute the cost of total memstore size.  The more unbalanced the higher the
1669   * computed cost will be.  This uses a rolling average of regionload.
1670   */
1671  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1672
1673    private static final String MEMSTORE_SIZE_COST_KEY =
1674        "hbase.master.balancer.stochastic.memstoreSizeCost";
1675    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1676
1677    MemStoreSizeCostFunction(Configuration conf) {
1678      super(conf);
1679      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1680    }
1681
1682    @Override
1683    protected double getCostFromRl(BalancerRegionLoad rl) {
1684      return rl.getMemStoreSizeMB();
1685    }
1686  }
1687  /**
1688   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1689   * computed cost will be.  This uses a rolling average of regionload.
1690   */
1691  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1692
1693    private static final String STOREFILE_SIZE_COST_KEY =
1694        "hbase.master.balancer.stochastic.storefileSizeCost";
1695    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1696
1697    StoreFileCostFunction(Configuration conf) {
1698      super(conf);
1699      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1700    }
1701
1702    @Override
1703    protected double getCostFromRl(BalancerRegionLoad rl) {
1704      return rl.getStorefileSizeMB();
1705    }
1706  }
1707
1708  /**
1709   * A helper function to compose the attribute name from tablename and costfunction name
1710   */
1711  public static String composeAttributeName(String tableName, String costFunctionName) {
1712    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1713  }
1714}