001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Optional;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057
058/**
059 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
060 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
061 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 *
070 *
071 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
072 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
073 * scaled by their respective multipliers:</p>
074 *
075 * <ul>
076 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
077 *   <li>hbase.master.balancer.stochastic.moveCost</li>
078 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
079 *   <li>hbase.master.balancer.stochastic.localityCost</li>
080 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
081 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
082 * </ul>
083 *
084 * <p>In addition to the above configurations, the balancer can be tuned by the following
085 * configuration values:</p>
086 * <ul>
087 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
088 *   controls what the max number of regions that can be moved in a single invocation of this
089 *   balancer.</li>
090 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
091 *   regions is multiplied to try and get the number of times the balancer will
092 *   mutate all servers.</li>
093 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
094 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
095 *   value and the above computation.</li>
096 * </ul>
097 *
098 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
099 * so that the balancer gets the full picture of all loads on the cluster.</p>
100 */
101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
102@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
103  justification="Complaint is about costFunctions not being synchronized; not end of the world")
104public class StochasticLoadBalancer extends BaseLoadBalancer {
105
106  protected static final String STEPS_PER_REGION_KEY =
107      "hbase.master.balancer.stochastic.stepsPerRegion";
108  protected static final String MAX_STEPS_KEY =
109      "hbase.master.balancer.stochastic.maxSteps";
110  protected static final String RUN_MAX_STEPS_KEY =
111      "hbase.master.balancer.stochastic.runMaxSteps";
112  protected static final String MAX_RUNNING_TIME_KEY =
113      "hbase.master.balancer.stochastic.maxRunningTime";
114  protected static final String KEEP_REGION_LOADS =
115      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
116  private static final String TABLE_FUNCTION_SEP = "_";
117  protected static final String MIN_COST_NEED_BALANCE_KEY =
118      "hbase.master.balancer.stochastic.minCostNeedBalance";
119
120  protected static final Random RANDOM = new Random(System.currentTimeMillis());
121  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
122
123  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
124
125  // values are defaults
126  private int maxSteps = 1000000;
127  private boolean runMaxSteps = false;
128  private int stepsPerRegion = 800;
129  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
130  private int numRegionLoadsToRemember = 15;
131  private float minCostNeedBalance = 0.05f;
132
133  private List<CandidateGenerator> candidateGenerators;
134  private CostFromRegionLoadFunction[] regionLoadFunctions;
135  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
136
137  // to save and report costs to JMX
138  private Double curOverallCost = 0d;
139  private Double[] tempFunctionCosts;
140  private Double[] curFunctionCosts;
141
142  // Keep locality based picker and cost function to alert them
143  // when new services are offered
144  private LocalityBasedCandidateGenerator localityCandidateGenerator;
145  private ServerLocalityCostFunction localityCost;
146  private RackLocalityCostFunction rackLocalityCost;
147  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
148  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
149  private boolean isByTable = false;
150  private TableName tableName = null;
151
152  /**
153   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
154   * default MetricsBalancer
155   */
156  public StochasticLoadBalancer() {
157    super(new MetricsStochasticBalancer());
158  }
159
160  @Override
161  public void onConfigurationChange(Configuration conf) {
162    setConf(conf);
163  }
164
165  @Override
166  public synchronized void setConf(Configuration conf) {
167    super.setConf(conf);
168    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
169    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
170    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
171    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
172
173    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
174    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
175    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
176    if (localityCandidateGenerator == null) {
177      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
178    }
179    localityCost = new ServerLocalityCostFunction(conf, services);
180    rackLocalityCost = new RackLocalityCostFunction(conf, services);
181
182    if (this.candidateGenerators == null) {
183      candidateGenerators = Lists.newArrayList();
184      candidateGenerators.add(new RandomCandidateGenerator());
185      candidateGenerators.add(new LoadCandidateGenerator());
186      candidateGenerators.add(localityCandidateGenerator);
187      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
188    }
189    regionLoadFunctions = new CostFromRegionLoadFunction[] {
190      new ReadRequestCostFunction(conf),
191      new WriteRequestCostFunction(conf),
192      new MemStoreSizeCostFunction(conf),
193      new StoreFileCostFunction(conf)
194    };
195    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
196    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
197    costFunctions = new CostFunction[]{
198      new RegionCountSkewCostFunction(conf),
199      new PrimaryRegionCountSkewCostFunction(conf),
200      new MoveCostFunction(conf),
201      localityCost,
202      rackLocalityCost,
203      new TableSkewCostFunction(conf),
204      regionReplicaHostCostFunction,
205      regionReplicaRackCostFunction,
206      regionLoadFunctions[0],
207      regionLoadFunctions[1],
208      regionLoadFunctions[2],
209      regionLoadFunctions[3],
210    };
211    curFunctionCosts= new Double[costFunctions.length];
212    tempFunctionCosts= new Double[costFunctions.length];
213    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
214        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
215  }
216
217  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
218    this.candidateGenerators = customCandidateGenerators;
219  }
220
221  @Override
222  protected void setSlop(Configuration conf) {
223    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
224  }
225
226  @Override
227  public synchronized void setClusterMetrics(ClusterMetrics st) {
228    super.setClusterMetrics(st);
229    updateRegionLoad();
230    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
231      cost.setClusterMetrics(st);
232    }
233
234    // update metrics size
235    try {
236      // by-table or ensemble mode
237      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
238      int functionsCount = getCostFunctionNames().length;
239
240      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
241    } catch (Exception e) {
242      LOG.error("failed to get the size of all tables", e);
243    }
244  }
245
246  /**
247   * Update the number of metrics that are reported to JMX
248   */
249  public void updateMetricsSize(int size) {
250    if (metricsBalancer instanceof MetricsStochasticBalancer) {
251        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
252    }
253  }
254
255  @Override
256  public synchronized void setMasterServices(MasterServices masterServices) {
257    super.setMasterServices(masterServices);
258    this.localityCost.setServices(masterServices);
259    this.rackLocalityCost.setServices(masterServices);
260    this.localityCandidateGenerator.setServices(masterServices);
261  }
262
263  @Override
264  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
265    regionReplicaHostCostFunction.init(c);
266    if (regionReplicaHostCostFunction.cost() > 0) return true;
267    regionReplicaRackCostFunction.init(c);
268    if (regionReplicaRackCostFunction.cost() > 0) return true;
269    return false;
270  }
271
272  @Override
273  protected boolean needsBalance(Cluster cluster) {
274    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
275    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
276      if (LOG.isDebugEnabled()) {
277        LOG.debug("Not running balancer because only " + cs.getNumServers()
278            + " active regionserver(s)");
279      }
280      return false;
281    }
282    if (areSomeRegionReplicasColocated(cluster)) {
283      return true;
284    }
285
286    double total = 0.0;
287    float sumMultiplier = 0.0f;
288    for (CostFunction c : costFunctions) {
289      float multiplier = c.getMultiplier();
290      if (multiplier <= 0) {
291        continue;
292      }
293      if (!c.isNeeded()) {
294        LOG.debug("{} not needed", c.getClass().getSimpleName());
295        continue;
296      }
297      sumMultiplier += multiplier;
298      total += c.cost() * multiplier;
299    }
300
301    if (total <= 0 || sumMultiplier <= 0
302        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
303      if (LOG.isTraceEnabled()) {
304        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
305          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
306          + minCostNeedBalance);
307      }
308      return false;
309    }
310    return true;
311  }
312
313  @Override
314  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
315    List<RegionInfo>> clusterState) {
316    this.tableName = tableName;
317    return balanceCluster(clusterState);
318  }
319
320  @VisibleForTesting
321  Cluster.Action nextAction(Cluster cluster) {
322    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
323            .generate(cluster);
324  }
325
326  /**
327   * Given the cluster state this will try and approach an optimal balance. This
328   * should always approach the optimal state given enough steps.
329   */
330  @Override
331  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
332    List<RegionInfo>> clusterState) {
333    List<RegionPlan> plans = balanceMasterRegions(clusterState);
334    if (plans != null || clusterState == null || clusterState.size() <= 1) {
335      return plans;
336    }
337
338    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
339      if (clusterState.size() <= 2) {
340        return null;
341      }
342      clusterState = new HashMap<>(clusterState);
343      clusterState.remove(masterServerName);
344    }
345
346    // On clusters with lots of HFileLinks or lots of reference files,
347    // instantiating the storefile infos can be quite expensive.
348    // Allow turning this feature off if the locality cost is not going to
349    // be used in any computations.
350    RegionLocationFinder finder = null;
351    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
352        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
353      finder = this.regionFinder;
354    }
355
356    //The clusterState that is given to this method contains the state
357    //of all the regions in the table(s) (that's true today)
358    // Keep track of servers to iterate through them.
359    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
360
361    long startTime = EnvironmentEdgeManager.currentTime();
362
363    initCosts(cluster);
364
365    if (!needsBalance(cluster)) {
366      return null;
367    }
368
369    double currentCost = computeCost(cluster, Double.MAX_VALUE);
370    curOverallCost = currentCost;
371    for (int i = 0; i < this.curFunctionCosts.length; i++) {
372      curFunctionCosts[i] = tempFunctionCosts[i];
373    }
374    double initCost = currentCost;
375    double newCost = currentCost;
376
377    long computedMaxSteps;
378    if (runMaxSteps) {
379      computedMaxSteps = Math.max(this.maxSteps,
380          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
381    } else {
382      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
383          (long)cluster.numServers;
384      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
385      if (calculatedMaxSteps > maxSteps) {
386        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
387            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
388            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
389            + "(This config change does not require service restart)", calculatedMaxSteps,
390            maxRunningTime);
391      }
392    }
393    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
394        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
395
396    // Perform a stochastic walk to see if we can get a good fit.
397    long step;
398
399    for (step = 0; step < computedMaxSteps; step++) {
400      Cluster.Action action = nextAction(cluster);
401
402      if (action.type == Type.NULL) {
403        continue;
404      }
405
406      cluster.doAction(action);
407      updateCostsWithAction(cluster, action);
408
409      newCost = computeCost(cluster, currentCost);
410
411      // Should this be kept?
412      if (newCost < currentCost) {
413        currentCost = newCost;
414
415        // save for JMX
416        curOverallCost = currentCost;
417        for (int i = 0; i < this.curFunctionCosts.length; i++) {
418          curFunctionCosts[i] = tempFunctionCosts[i];
419        }
420      } else {
421        // Put things back the way they were before.
422        // TODO: undo by remembering old values
423        Action undoAction = action.undoAction();
424        cluster.doAction(undoAction);
425        updateCostsWithAction(cluster, undoAction);
426      }
427
428      if (EnvironmentEdgeManager.currentTime() - startTime >
429          maxRunningTime) {
430        break;
431      }
432    }
433    long endTime = EnvironmentEdgeManager.currentTime();
434
435    metricsBalancer.balanceCluster(endTime - startTime);
436
437    // update costs metrics
438    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
439    if (initCost > currentCost) {
440      plans = createRegionPlans(cluster);
441      LOG.info("Finished computing new load balance plan. Computation took {}" +
442        " to try {} different iterations.  Found a solution that moves " +
443        "{} regions; Going from a computed cost of {}" +
444        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
445        step, plans.size(), initCost, currentCost);
446      return plans;
447    }
448    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
449      "{}, and did not find anything with a computed cost less than {}", step,
450      java.time.Duration.ofMillis(endTime - startTime), initCost);
451    return null;
452  }
453
454  /**
455   * update costs to JMX
456   */
457  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
458    if (tableName == null) return;
459
460    // check if the metricsBalancer is MetricsStochasticBalancer before casting
461    if (metricsBalancer instanceof MetricsStochasticBalancer) {
462      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
463      // overall cost
464      balancer.updateStochasticCost(tableName.getNameAsString(),
465        "Overall", "Overall cost", overall);
466
467      // each cost function
468      for (int i = 0; i < costFunctions.length; i++) {
469        CostFunction costFunction = costFunctions[i];
470        String costFunctionName = costFunction.getClass().getSimpleName();
471        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
472        // TODO: cost function may need a specific description
473        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
474          "The percent of " + costFunctionName, costPercent);
475      }
476    }
477  }
478
479  private String functionCost() {
480    StringBuilder builder = new StringBuilder();
481    for (CostFunction c:costFunctions) {
482      builder.append(c.getClass().getSimpleName());
483      builder.append(" : (");
484      builder.append(c.getMultiplier());
485      builder.append(", ");
486      builder.append(c.cost());
487      builder.append("); ");
488    }
489    return builder.toString();
490  }
491
492  /**
493   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
494   * state.
495   *
496   * @param cluster The state of the cluster
497   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
498   */
499  private List<RegionPlan> createRegionPlans(Cluster cluster) {
500    List<RegionPlan> plans = new LinkedList<>();
501    for (int regionIndex = 0;
502         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
503      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
504      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
505
506      if (initialServerIndex != newServerIndex) {
507        RegionInfo region = cluster.regions[regionIndex];
508        ServerName initialServer = cluster.servers[initialServerIndex];
509        ServerName newServer = cluster.servers[newServerIndex];
510
511        if (LOG.isTraceEnabled()) {
512          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
513              + initialServer.getHostname() + " to " + newServer.getHostname());
514        }
515        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
516        plans.add(rp);
517      }
518    }
519    return plans;
520  }
521
522  /**
523   * Store the current region loads.
524   */
525  private synchronized void updateRegionLoad() {
526    // We create a new hashmap so that regions that are no longer there are removed.
527    // However we temporarily need the old loads so we can use them to keep the rolling average.
528    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
529    loads = new HashMap<>();
530
531    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
532      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
533        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
534        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
535        if (rLoads == null) {
536          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
537        } else if (rLoads.size() >= numRegionLoadsToRemember) {
538          rLoads.remove();
539        }
540        rLoads.add(new BalancerRegionLoad(rm));
541        loads.put(regionNameAsString, rLoads);
542      });
543    });
544
545    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
546      cost.setLoads(loads);
547    }
548  }
549
550  protected void initCosts(Cluster cluster) {
551    for (CostFunction c:costFunctions) {
552      c.init(cluster);
553    }
554  }
555
556  protected void updateCostsWithAction(Cluster cluster, Action action) {
557    for (CostFunction c : costFunctions) {
558      c.postAction(action);
559    }
560  }
561
562  /**
563   * Get the names of the cost functions
564   */
565  public String[] getCostFunctionNames() {
566    if (costFunctions == null) return null;
567    String[] ret = new String[costFunctions.length];
568    for (int i = 0; i < costFunctions.length; i++) {
569      CostFunction c = costFunctions[i];
570      ret[i] = c.getClass().getSimpleName();
571    }
572
573    return ret;
574  }
575
576  /**
577   * This is the main cost function.  It will compute a cost associated with a proposed cluster
578   * state.  All different costs will be combined with their multipliers to produce a double cost.
579   *
580   * @param cluster The state of the cluster
581   * @param previousCost the previous cost. This is used as an early out.
582   * @return a double of a cost associated with the proposed cluster state.  This cost is an
583   *         aggregate of all individual cost functions.
584   */
585  protected double computeCost(Cluster cluster, double previousCost) {
586    double total = 0;
587
588    for (int i = 0; i < costFunctions.length; i++) {
589      CostFunction c = costFunctions[i];
590      this.tempFunctionCosts[i] = 0.0;
591
592      if (c.getMultiplier() <= 0) {
593        continue;
594      }
595
596      Float multiplier = c.getMultiplier();
597      Double cost = c.cost();
598
599      this.tempFunctionCosts[i] = multiplier*cost;
600      total += this.tempFunctionCosts[i];
601
602      if (total > previousCost) {
603        break;
604      }
605    }
606
607    return total;
608  }
609
610  /** Generates a candidate action to be applied to the cluster for cost function search */
611  abstract static class CandidateGenerator {
612    abstract Cluster.Action generate(Cluster cluster);
613
614    /**
615     * From a list of regions pick a random one. Null can be returned which
616     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
617     * rather than swap.
618     *
619     * @param cluster        The state of the cluster
620     * @param server         index of the server
621     * @param chanceOfNoSwap Chance that this will decide to try a move rather
622     *                       than a swap.
623     * @return a random {@link RegionInfo} or null if an asymmetrical move is
624     *         suggested.
625     */
626    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
627      // Check to see if this is just a move.
628      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
629        // signal a move only.
630        return -1;
631      }
632      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
633      return cluster.regionsPerServer[server][rand];
634
635    }
636    protected int pickRandomServer(Cluster cluster) {
637      if (cluster.numServers < 1) {
638        return -1;
639      }
640
641      return RANDOM.nextInt(cluster.numServers);
642    }
643
644    protected int pickRandomRack(Cluster cluster) {
645      if (cluster.numRacks < 1) {
646        return -1;
647      }
648
649      return RANDOM.nextInt(cluster.numRacks);
650    }
651
652    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
653      if (cluster.numServers < 2) {
654        return -1;
655      }
656      while (true) {
657        int otherServerIndex = pickRandomServer(cluster);
658        if (otherServerIndex != serverIndex) {
659          return otherServerIndex;
660        }
661      }
662    }
663
664    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
665      if (cluster.numRacks < 2) {
666        return -1;
667      }
668      while (true) {
669        int otherRackIndex = pickRandomRack(cluster);
670        if (otherRackIndex != rackIndex) {
671          return otherRackIndex;
672        }
673      }
674    }
675
676    protected Cluster.Action pickRandomRegions(Cluster cluster,
677                                                       int thisServer,
678                                                       int otherServer) {
679      if (thisServer < 0 || otherServer < 0) {
680        return Cluster.NullAction;
681      }
682
683      // Decide who is most likely to need another region
684      int thisRegionCount = cluster.getNumRegions(thisServer);
685      int otherRegionCount = cluster.getNumRegions(otherServer);
686
687      // Assign the chance based upon the above
688      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
689      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
690
691      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
692      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
693
694      return getAction(thisServer, thisRegion, otherServer, otherRegion);
695    }
696
697    protected Cluster.Action getAction(int fromServer, int fromRegion,
698        int toServer, int toRegion) {
699      if (fromServer < 0 || toServer < 0) {
700        return Cluster.NullAction;
701      }
702      if (fromRegion > 0 && toRegion > 0) {
703        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
704          toServer, toRegion);
705      } else if (fromRegion > 0) {
706        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
707      } else if (toRegion > 0) {
708        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
709      } else {
710        return Cluster.NullAction;
711      }
712    }
713
714    /**
715     * Returns a random iteration order of indexes of an array with size length
716     */
717    protected List<Integer> getRandomIterationOrder(int length) {
718      ArrayList<Integer> order = new ArrayList<>(length);
719      for (int i = 0; i < length; i++) {
720        order.add(i);
721      }
722      Collections.shuffle(order);
723      return order;
724    }
725  }
726
727  static class RandomCandidateGenerator extends CandidateGenerator {
728
729    @Override
730    Cluster.Action generate(Cluster cluster) {
731
732      int thisServer = pickRandomServer(cluster);
733
734      // Pick the other server
735      int otherServer = pickOtherRandomServer(cluster, thisServer);
736
737      return pickRandomRegions(cluster, thisServer, otherServer);
738    }
739  }
740
741  static class LoadCandidateGenerator extends CandidateGenerator {
742
743    @Override
744    Cluster.Action generate(Cluster cluster) {
745      cluster.sortServersByRegionCount();
746      int thisServer = pickMostLoadedServer(cluster, -1);
747      int otherServer = pickLeastLoadedServer(cluster, thisServer);
748
749      return pickRandomRegions(cluster, thisServer, otherServer);
750    }
751
752    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
753      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
754
755      int index = 0;
756      while (servers[index] == null || servers[index] == thisServer) {
757        index++;
758        if (index == servers.length) {
759          return -1;
760        }
761      }
762      return servers[index];
763    }
764
765    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
766      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
767
768      int index = servers.length - 1;
769      while (servers[index] == null || servers[index] == thisServer) {
770        index--;
771        if (index < 0) {
772          return -1;
773        }
774      }
775      return servers[index];
776    }
777  }
778
779  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
780
781    private MasterServices masterServices;
782
783    LocalityBasedCandidateGenerator(MasterServices masterServices) {
784      this.masterServices = masterServices;
785    }
786
787    @Override
788    Cluster.Action generate(Cluster cluster) {
789      if (this.masterServices == null) {
790        int thisServer = pickRandomServer(cluster);
791        // Pick the other server
792        int otherServer = pickOtherRandomServer(cluster, thisServer);
793        return pickRandomRegions(cluster, thisServer, otherServer);
794      }
795
796      // Randomly iterate through regions until you find one that is not on ideal host
797      for (int region : getRandomIterationOrder(cluster.numRegions)) {
798        int currentServer = cluster.regionIndexToServerIndex[region];
799        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
800          Optional<Action> potential = tryMoveOrSwap(
801              cluster,
802              currentServer,
803              region,
804              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
805          );
806          if (potential.isPresent()) {
807            return potential.get();
808          }
809        }
810      }
811      return Cluster.NullAction;
812    }
813
814    /**
815     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
816     * Returns empty optional if no move can be found
817     */
818    private Optional<Action> tryMoveOrSwap(Cluster cluster,
819                                           int fromServer,
820                                           int fromRegion,
821                                           int toServer) {
822      // Try move first. We know apriori fromRegion has the highest locality on toServer
823      if (cluster.serverHasTooFewRegions(toServer)) {
824        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
825      }
826
827      // Compare locality gain/loss from swapping fromRegion with regions on toServer
828      double fromRegionLocalityDelta =
829          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
830      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
831        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
832        double toRegionLocalityDelta =
833            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
834        // If locality would remain neutral or improve, attempt the swap
835        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
836          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
837        }
838      }
839
840      return Optional.absent();
841    }
842
843    private double getWeightedLocality(Cluster cluster, int region, int server) {
844      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
845    }
846
847    void setServices(MasterServices services) {
848      this.masterServices = services;
849    }
850  }
851
852  /**
853   * Generates candidates which moves the replicas out of the region server for
854   * co-hosted region replicas
855   */
856  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
857
858    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
859
860    /**
861     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
862     * (a group is a server, host or rack)
863     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
864     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
865     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
866     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
867     * @return a regionIndex for the selected primary or -1 if there is no co-locating
868     */
869    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
870        , int[] regionIndexToPrimaryIndex) {
871      int currentPrimary = -1;
872      int currentPrimaryIndex = -1;
873      int selectedPrimaryIndex = -1;
874      double currentLargestRandom = -1;
875      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
876      // ids for the regions hosted in server, a consecutive repetition means that replicas
877      // are co-hosted
878      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
879        int primary = j < primariesOfRegionsPerGroup.length
880            ? primariesOfRegionsPerGroup[j] : -1;
881        if (primary != currentPrimary) { // check for whether we see a new primary
882          int numReplicas = j - currentPrimaryIndex;
883          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
884            // decide to select this primary region id or not
885            double currentRandom = RANDOM.nextDouble();
886            // we don't know how many region replicas are co-hosted, we will randomly select one
887            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
888            if (currentRandom > currentLargestRandom) {
889              selectedPrimaryIndex = currentPrimary;
890              currentLargestRandom = currentRandom;
891            }
892          }
893          currentPrimary = primary;
894          currentPrimaryIndex = j;
895        }
896      }
897
898      // we have found the primary id for the region to move. Now find the actual regionIndex
899      // with the given primary, prefer to move the secondary region.
900      for (int j = 0; j < regionsPerGroup.length; j++) {
901        int regionIndex = regionsPerGroup[j];
902        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
903          // always move the secondary, not the primary
904          if (selectedPrimaryIndex != regionIndex) {
905            return regionIndex;
906          }
907        }
908      }
909      return -1;
910    }
911
912    @Override
913    Cluster.Action generate(Cluster cluster) {
914      int serverIndex = pickRandomServer(cluster);
915      if (cluster.numServers <= 1 || serverIndex == -1) {
916        return Cluster.NullAction;
917      }
918
919      int regionIndex = selectCoHostedRegionPerGroup(
920        cluster.primariesOfRegionsPerServer[serverIndex],
921        cluster.regionsPerServer[serverIndex],
922        cluster.regionIndexToPrimaryIndex);
923
924      // if there are no pairs of region replicas co-hosted, default to random generator
925      if (regionIndex == -1) {
926        // default to randompicker
927        return randomGenerator.generate(cluster);
928      }
929
930      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
931      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
932      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
933    }
934  }
935
936  /**
937   * Generates candidates which moves the replicas out of the rack for
938   * co-hosted region replicas in the same rack
939   */
940  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
941    @Override
942    Cluster.Action generate(Cluster cluster) {
943      int rackIndex = pickRandomRack(cluster);
944      if (cluster.numRacks <= 1 || rackIndex == -1) {
945        return super.generate(cluster);
946      }
947
948      int regionIndex = selectCoHostedRegionPerGroup(
949        cluster.primariesOfRegionsPerRack[rackIndex],
950        cluster.regionsPerRack[rackIndex],
951        cluster.regionIndexToPrimaryIndex);
952
953      // if there are no pairs of region replicas co-hosted, default to random generator
954      if (regionIndex == -1) {
955        // default to randompicker
956        return randomGenerator.generate(cluster);
957      }
958
959      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
960      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
961
962      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
963      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
964      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
965      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
966    }
967  }
968
969  /**
970   * Base class of StochasticLoadBalancer's Cost Functions.
971   */
972  abstract static class CostFunction {
973
974    private float multiplier = 0;
975
976    protected Cluster cluster;
977
978    CostFunction(Configuration c) {
979    }
980
981    boolean isNeeded() {
982      return true;
983    }
984    float getMultiplier() {
985      return multiplier;
986    }
987
988    void setMultiplier(float m) {
989      this.multiplier = m;
990    }
991
992    /** Called once per LB invocation to give the cost function
993     * to initialize it's state, and perform any costly calculation.
994     */
995    void init(Cluster cluster) {
996      this.cluster = cluster;
997    }
998
999    /** Called once per cluster Action to give the cost function
1000     * an opportunity to update it's state. postAction() is always
1001     * called at least once before cost() is called with the cluster
1002     * that this action is performed on. */
1003    void postAction(Action action) {
1004      switch (action.type) {
1005      case NULL: break;
1006      case ASSIGN_REGION:
1007        AssignRegionAction ar = (AssignRegionAction) action;
1008        regionMoved(ar.region, -1, ar.server);
1009        break;
1010      case MOVE_REGION:
1011        MoveRegionAction mra = (MoveRegionAction) action;
1012        regionMoved(mra.region, mra.fromServer, mra.toServer);
1013        break;
1014      case SWAP_REGIONS:
1015        SwapRegionsAction a = (SwapRegionsAction) action;
1016        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1017        regionMoved(a.toRegion, a.toServer, a.fromServer);
1018        break;
1019      default:
1020        throw new RuntimeException("Uknown action:" + action.type);
1021      }
1022    }
1023
1024    protected void regionMoved(int region, int oldServer, int newServer) {
1025    }
1026
1027    abstract double cost();
1028
1029    /**
1030     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1031     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1032     * possible is all of the elements in one region server and the rest having 0.
1033     *
1034     * @param stats the costs
1035     * @return a scaled set of costs.
1036     */
1037    protected double costFromArray(double[] stats) {
1038      double totalCost = 0;
1039      double total = getSum(stats);
1040
1041      double count = stats.length;
1042      double mean = total/count;
1043
1044      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1045      // a zero sum cost for this to make sense.
1046      double max = ((count - 1) * mean) + (total - mean);
1047
1048      // It's possible that there aren't enough regions to go around
1049      double min;
1050      if (count > total) {
1051        min = ((count - total) * mean) + ((1 - mean) * total);
1052      } else {
1053        // Some will have 1 more than everything else.
1054        int numHigh = (int) (total - (Math.floor(mean) * count));
1055        int numLow = (int) (count - numHigh);
1056
1057        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1058
1059      }
1060      min = Math.max(0, min);
1061      for (int i=0; i<stats.length; i++) {
1062        double n = stats[i];
1063        double diff = Math.abs(mean - n);
1064        totalCost += diff;
1065      }
1066
1067      double scaled =  scale(min, max, totalCost);
1068      return scaled;
1069    }
1070
1071    private double getSum(double[] stats) {
1072      double total = 0;
1073      for(double s:stats) {
1074        total += s;
1075      }
1076      return total;
1077    }
1078
1079    /**
1080     * Scale the value between 0 and 1.
1081     *
1082     * @param min   Min value
1083     * @param max   The Max value
1084     * @param value The value to be scaled.
1085     * @return The scaled value.
1086     */
1087    protected double scale(double min, double max, double value) {
1088      if (max <= min || value <= min) {
1089        return 0;
1090      }
1091      if ((max - min) == 0) return 0;
1092
1093      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1094    }
1095  }
1096
1097  /**
1098   * Given the starting state of the regions and a potential ending state
1099   * compute cost based upon the number of regions that have moved.
1100   */
1101  static class MoveCostFunction extends CostFunction {
1102    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1103    private static final String MAX_MOVES_PERCENT_KEY =
1104        "hbase.master.balancer.stochastic.maxMovePercent";
1105    private static final float DEFAULT_MOVE_COST = 7;
1106    private static final int DEFAULT_MAX_MOVES = 600;
1107    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1108
1109    private final float maxMovesPercent;
1110
1111    MoveCostFunction(Configuration conf) {
1112      super(conf);
1113
1114      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1115      // that large benefits are need to overcome the cost of a move.
1116      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1117      // What percent of the number of regions a single run of the balancer can move.
1118      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1119    }
1120
1121    @Override
1122    double cost() {
1123      // Try and size the max number of Moves, but always be prepared to move some.
1124      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1125          DEFAULT_MAX_MOVES);
1126
1127      double moveCost = cluster.numMovedRegions;
1128
1129      // Don't let this single balance move more than the max moves.
1130      // This allows better scaling to accurately represent the actual cost of a move.
1131      if (moveCost > maxMoves) {
1132        return 1000000;   // return a number much greater than any of the other cost
1133      }
1134
1135      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1136    }
1137  }
1138
1139  /**
1140   * Compute the cost of a potential cluster state from skew in number of
1141   * regions on a cluster.
1142   */
1143  static class RegionCountSkewCostFunction extends CostFunction {
1144    private static final String REGION_COUNT_SKEW_COST_KEY =
1145        "hbase.master.balancer.stochastic.regionCountCost";
1146    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1147
1148    private double[] stats = null;
1149
1150    RegionCountSkewCostFunction(Configuration conf) {
1151      super(conf);
1152      // Load multiplier should be the greatest as it is the most general way to balance data.
1153      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1154    }
1155
1156    @Override
1157    double cost() {
1158      if (stats == null || stats.length != cluster.numServers) {
1159        stats = new double[cluster.numServers];
1160      }
1161
1162      for (int i =0; i < cluster.numServers; i++) {
1163        stats[i] = cluster.regionsPerServer[i].length;
1164      }
1165
1166      return costFromArray(stats);
1167    }
1168  }
1169
1170  /**
1171   * Compute the cost of a potential cluster state from skew in number of
1172   * primary regions on a cluster.
1173   */
1174  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1175    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1176        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1177    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1178
1179    private double[] stats = null;
1180
1181    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1182      super(conf);
1183      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1184      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1185        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1186    }
1187
1188    @Override
1189    double cost() {
1190      if (!cluster.hasRegionReplicas) {
1191        return 0;
1192      }
1193      if (stats == null || stats.length != cluster.numServers) {
1194        stats = new double[cluster.numServers];
1195      }
1196
1197      for (int i = 0; i < cluster.numServers; i++) {
1198        stats[i] = 0;
1199        for (int regionIdx : cluster.regionsPerServer[i]) {
1200          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1201            stats[i]++;
1202          }
1203        }
1204      }
1205
1206      return costFromArray(stats);
1207    }
1208  }
1209
1210  /**
1211   * Compute the cost of a potential cluster configuration based upon how evenly
1212   * distributed tables are.
1213   */
1214  static class TableSkewCostFunction extends CostFunction {
1215
1216    private static final String TABLE_SKEW_COST_KEY =
1217        "hbase.master.balancer.stochastic.tableSkewCost";
1218    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1219
1220    TableSkewCostFunction(Configuration conf) {
1221      super(conf);
1222      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1223    }
1224
1225    @Override
1226    double cost() {
1227      double max = cluster.numRegions;
1228      double min = ((double) cluster.numRegions) / cluster.numServers;
1229      double value = 0;
1230
1231      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1232        value += cluster.numMaxRegionsPerTable[i];
1233      }
1234
1235      return scale(min, max, value);
1236    }
1237  }
1238
1239  /**
1240   * Compute a cost of a potential cluster configuration based upon where
1241   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1242   */
1243  static abstract class LocalityBasedCostFunction extends CostFunction {
1244
1245    private final LocalityType type;
1246
1247    private double bestLocality; // best case locality across cluster weighted by local data size
1248    private double locality; // current locality across cluster weighted by local data size
1249
1250    private MasterServices services;
1251
1252    LocalityBasedCostFunction(Configuration conf,
1253                              MasterServices srv,
1254                              LocalityType type,
1255                              String localityCostKey,
1256                              float defaultLocalityCost) {
1257      super(conf);
1258      this.type = type;
1259      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1260      this.services = srv;
1261      this.locality = 0.0;
1262      this.bestLocality = 0.0;
1263    }
1264
1265    /**
1266     * Maps region to the current entity (server or rack) on which it is stored
1267     */
1268    abstract int regionIndexToEntityIndex(int region);
1269
1270    public void setServices(MasterServices srvc) {
1271      this.services = srvc;
1272    }
1273
1274    @Override
1275    void init(Cluster cluster) {
1276      super.init(cluster);
1277      locality = 0.0;
1278      bestLocality = 0.0;
1279
1280      // If no master, no computation will work, so assume 0 cost
1281      if (this.services == null) {
1282        return;
1283      }
1284
1285      for (int region = 0; region < cluster.numRegions; region++) {
1286        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1287        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1288      }
1289
1290      // We normalize locality to be a score between 0 and 1.0 representing how good it
1291      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1292      // (and the cost is 0)
1293      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1294    }
1295
1296    @Override
1297    protected void regionMoved(int region, int oldServer, int newServer) {
1298      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1299      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1300      if (this.services == null) {
1301        return;
1302      }
1303      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1304      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1305      locality += normalizedDelta;
1306    }
1307
1308    @Override
1309    double cost() {
1310      return 1 - locality;
1311    }
1312
1313    private int getMostLocalEntityForRegion(int region) {
1314      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1315    }
1316
1317    private double getWeightedLocality(int region, int entity) {
1318      return cluster.getOrComputeWeightedLocality(region, entity, type);
1319    }
1320
1321  }
1322
1323  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1324
1325    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1326    private static final float DEFAULT_LOCALITY_COST = 25;
1327
1328    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1329      super(
1330          conf,
1331          srv,
1332          LocalityType.SERVER,
1333          LOCALITY_COST_KEY,
1334          DEFAULT_LOCALITY_COST
1335      );
1336    }
1337
1338    @Override
1339    int regionIndexToEntityIndex(int region) {
1340      return cluster.regionIndexToServerIndex[region];
1341    }
1342  }
1343
1344  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1345
1346    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1347    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1348
1349    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1350      super(
1351          conf,
1352          services,
1353          LocalityType.RACK,
1354          RACK_LOCALITY_COST_KEY,
1355          DEFAULT_RACK_LOCALITY_COST
1356      );
1357    }
1358
1359    @Override
1360    int regionIndexToEntityIndex(int region) {
1361      return cluster.getRackForRegion(region);
1362    }
1363  }
1364
1365  /**
1366   * Base class the allows writing costs functions from rolling average of some
1367   * number from RegionLoad.
1368   */
1369  abstract static class CostFromRegionLoadFunction extends CostFunction {
1370
1371    private ClusterMetrics clusterStatus = null;
1372    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1373    private double[] stats = null;
1374    CostFromRegionLoadFunction(Configuration conf) {
1375      super(conf);
1376    }
1377
1378    void setClusterMetrics(ClusterMetrics status) {
1379      this.clusterStatus = status;
1380    }
1381
1382    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1383      this.loads = l;
1384    }
1385
1386    @Override
1387    double cost() {
1388      if (clusterStatus == null || loads == null) {
1389        return 0;
1390      }
1391
1392      if (stats == null || stats.length != cluster.numServers) {
1393        stats = new double[cluster.numServers];
1394      }
1395
1396      for (int i =0; i < stats.length; i++) {
1397        //Cost this server has from RegionLoad
1398        long cost = 0;
1399
1400        // for every region on this server get the rl
1401        for(int regionIndex:cluster.regionsPerServer[i]) {
1402          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1403
1404          // Now if we found a region load get the type of cost that was requested.
1405          if (regionLoadList != null) {
1406            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1407          }
1408        }
1409
1410        // Add the total cost to the stats.
1411        stats[i] = cost;
1412      }
1413
1414      // Now return the scaled cost from data held in the stats object.
1415      return costFromArray(stats);
1416    }
1417
1418    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1419      double cost = 0;
1420      for (BalancerRegionLoad rl : regionLoadList) {
1421        cost += getCostFromRl(rl);
1422      }
1423      return cost / regionLoadList.size();
1424    }
1425
1426    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1427  }
1428
1429  /**
1430   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1431   * We do not compare about the actual rate in requests per second but rather the rate relative
1432   * to the rest of the regions.
1433   */
1434  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1435
1436    CostFromRegionLoadAsRateFunction(Configuration conf) {
1437      super(conf);
1438    }
1439
1440    @Override
1441    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1442      double cost = 0;
1443      double previous = 0;
1444      boolean isFirst = true;
1445      for (BalancerRegionLoad rl : regionLoadList) {
1446        double current = getCostFromRl(rl);
1447        if (isFirst) {
1448          isFirst = false;
1449        } else {
1450          cost += current - previous;
1451        }
1452        previous = current;
1453      }
1454      return Math.max(0, cost / (regionLoadList.size() - 1));
1455    }
1456  }
1457
1458  /**
1459   * Compute the cost of total number of read requests  The more unbalanced the higher the
1460   * computed cost will be.  This uses a rolling average of regionload.
1461   */
1462
1463  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1464
1465    private static final String READ_REQUEST_COST_KEY =
1466        "hbase.master.balancer.stochastic.readRequestCost";
1467    private static final float DEFAULT_READ_REQUEST_COST = 5;
1468
1469    ReadRequestCostFunction(Configuration conf) {
1470      super(conf);
1471      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1472    }
1473
1474    @Override
1475    protected double getCostFromRl(BalancerRegionLoad rl) {
1476      return rl.getReadRequestsCount();
1477    }
1478  }
1479
1480  /**
1481   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1482   * computed cost will be.  This uses a rolling average of regionload.
1483   */
1484  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1485
1486    private static final String WRITE_REQUEST_COST_KEY =
1487        "hbase.master.balancer.stochastic.writeRequestCost";
1488    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1489
1490    WriteRequestCostFunction(Configuration conf) {
1491      super(conf);
1492      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1493    }
1494
1495    @Override
1496    protected double getCostFromRl(BalancerRegionLoad rl) {
1497      return rl.getWriteRequestsCount();
1498    }
1499  }
1500
1501  /**
1502   * A cost function for region replicas. We give a very high cost to hosting
1503   * replicas of the same region in the same host. We do not prevent the case
1504   * though, since if numReplicas > numRegionServers, we still want to keep the
1505   * replica open.
1506   */
1507  static class RegionReplicaHostCostFunction extends CostFunction {
1508    private static final String REGION_REPLICA_HOST_COST_KEY =
1509        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1510    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1511
1512    long maxCost = 0;
1513    long[] costsPerGroup; // group is either server, host or rack
1514    int[][] primariesOfRegionsPerGroup;
1515
1516    public RegionReplicaHostCostFunction(Configuration conf) {
1517      super(conf);
1518      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1519        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1520    }
1521
1522    @Override
1523    void init(Cluster cluster) {
1524      super.init(cluster);
1525      // max cost is the case where every region replica is hosted together regardless of host
1526      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1527      costsPerGroup = new long[cluster.numHosts];
1528      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1529          ? cluster.primariesOfRegionsPerHost
1530          : cluster.primariesOfRegionsPerServer;
1531      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1532        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1533      }
1534    }
1535
1536    long getMaxCost(Cluster cluster) {
1537      if (!cluster.hasRegionReplicas) {
1538        return 0; // short circuit
1539      }
1540      // max cost is the case where every region replica is hosted together regardless of host
1541      int[] primariesOfRegions = new int[cluster.numRegions];
1542      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1543          cluster.regions.length);
1544
1545      Arrays.sort(primariesOfRegions);
1546
1547      // compute numReplicas from the sorted array
1548      return costPerGroup(primariesOfRegions);
1549    }
1550
1551    @Override
1552    boolean isNeeded() {
1553      return cluster.hasRegionReplicas;
1554    }
1555
1556    @Override
1557    double cost() {
1558      if (maxCost <= 0) {
1559        return 0;
1560      }
1561
1562      long totalCost = 0;
1563      for (int i = 0 ; i < costsPerGroup.length; i++) {
1564        totalCost += costsPerGroup[i];
1565      }
1566      return scale(0, maxCost, totalCost);
1567    }
1568
1569    /**
1570     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1571     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1572     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1573     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1574     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1575     * @return a sum of numReplicas-1 squared for each primary region in the group.
1576     */
1577    protected long costPerGroup(int[] primariesOfRegions) {
1578      long cost = 0;
1579      int currentPrimary = -1;
1580      int currentPrimaryIndex = -1;
1581      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1582      // sharing the same primary will have consecutive numbers in the array.
1583      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1584        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1585        if (primary != currentPrimary) { // we see a new primary
1586          int numReplicas = j - currentPrimaryIndex;
1587          // square the cost
1588          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1589            cost += (numReplicas - 1) * (numReplicas - 1);
1590          }
1591          currentPrimary = primary;
1592          currentPrimaryIndex = j;
1593        }
1594      }
1595
1596      return cost;
1597    }
1598
1599    @Override
1600    protected void regionMoved(int region, int oldServer, int newServer) {
1601      if (maxCost <= 0) {
1602        return; // no need to compute
1603      }
1604      if (cluster.multiServersPerHost) {
1605        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1606        int newHost = cluster.serverIndexToHostIndex[newServer];
1607        if (newHost != oldHost) {
1608          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1609          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1610        }
1611      } else {
1612        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1613        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1614      }
1615    }
1616  }
1617
1618  /**
1619   * A cost function for region replicas for the rack distribution. We give a relatively high
1620   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1621   * though.
1622   */
1623  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1624    private static final String REGION_REPLICA_RACK_COST_KEY =
1625        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1626    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1627
1628    public RegionReplicaRackCostFunction(Configuration conf) {
1629      super(conf);
1630      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1631        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1632    }
1633
1634    @Override
1635    void init(Cluster cluster) {
1636      this.cluster = cluster;
1637      if (cluster.numRacks <= 1) {
1638        maxCost = 0;
1639        return; // disabled for 1 rack
1640      }
1641      // max cost is the case where every region replica is hosted together regardless of rack
1642      maxCost = getMaxCost(cluster);
1643      costsPerGroup = new long[cluster.numRacks];
1644      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1645        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1646      }
1647    }
1648
1649    @Override
1650    protected void regionMoved(int region, int oldServer, int newServer) {
1651      if (maxCost <= 0) {
1652        return; // no need to compute
1653      }
1654      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1655      int newRack = cluster.serverIndexToRackIndex[newServer];
1656      if (newRack != oldRack) {
1657        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1658        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1659      }
1660    }
1661  }
1662
1663  /**
1664   * Compute the cost of total memstore size.  The more unbalanced the higher the
1665   * computed cost will be.  This uses a rolling average of regionload.
1666   */
1667  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1668
1669    private static final String MEMSTORE_SIZE_COST_KEY =
1670        "hbase.master.balancer.stochastic.memstoreSizeCost";
1671    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1672
1673    MemStoreSizeCostFunction(Configuration conf) {
1674      super(conf);
1675      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1676    }
1677
1678    @Override
1679    protected double getCostFromRl(BalancerRegionLoad rl) {
1680      return rl.getMemStoreSizeMB();
1681    }
1682  }
1683  /**
1684   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1685   * computed cost will be.  This uses a rolling average of regionload.
1686   */
1687  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1688
1689    private static final String STOREFILE_SIZE_COST_KEY =
1690        "hbase.master.balancer.stochastic.storefileSizeCost";
1691    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1692
1693    StoreFileCostFunction(Configuration conf) {
1694      super(conf);
1695      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1696    }
1697
1698    @Override
1699    protected double getCostFromRl(BalancerRegionLoad rl) {
1700      return rl.getStorefileSizeMB();
1701    }
1702  }
1703
1704  /**
1705   * A helper function to compose the attribute name from tablename and costfunction name
1706   */
1707  public static String composeAttributeName(String tableName, String costFunctionName) {
1708    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1709  }
1710}