001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
055import org.apache.hbase.thirdparty.com.google.common.base.Optional;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058
059/**
060 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
061 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 *
071 *
072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
073 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
074 * scaled by their respective multipliers:</p>
075 *
076 * <ul>
077 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
078 *   <li>hbase.master.balancer.stochastic.moveCost</li>
079 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.localityCost</li>
081 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
082 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
083 * </ul>
084 *
085 * <p>In addition to the above configurations, the balancer can be tuned by the following
086 * configuration values:</p>
087 * <ul>
088 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
089 *   controls what the max number of regions that can be moved in a single invocation of this
090 *   balancer.</li>
091 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
092 *   regions is multiplied to try and get the number of times the balancer will
093 *   mutate all servers.</li>
094 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
095 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
096 *   value and the above computation.</li>
097 * </ul>
098 *
099 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
100 * so that the balancer gets the full picture of all loads on the cluster.</p>
101 */
102@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
103@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
104  justification="Complaint is about costFunctions not being synchronized; not end of the world")
105public class StochasticLoadBalancer extends BaseLoadBalancer {
106
107  protected static final String STEPS_PER_REGION_KEY =
108      "hbase.master.balancer.stochastic.stepsPerRegion";
109  protected static final String MAX_STEPS_KEY =
110      "hbase.master.balancer.stochastic.maxSteps";
111  protected static final String RUN_MAX_STEPS_KEY =
112      "hbase.master.balancer.stochastic.runMaxSteps";
113  protected static final String MAX_RUNNING_TIME_KEY =
114      "hbase.master.balancer.stochastic.maxRunningTime";
115  protected static final String KEEP_REGION_LOADS =
116      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
117  private static final String TABLE_FUNCTION_SEP = "_";
118  protected static final String MIN_COST_NEED_BALANCE_KEY =
119      "hbase.master.balancer.stochastic.minCostNeedBalance";
120
121  protected static final Random RANDOM = new Random(System.currentTimeMillis());
122  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
123
124  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
125
126  // values are defaults
127  private int maxSteps = 1000000;
128  private boolean runMaxSteps = false;
129  private int stepsPerRegion = 800;
130  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
131  private int numRegionLoadsToRemember = 15;
132  private float minCostNeedBalance = 0.05f;
133
134  private List<CandidateGenerator> candidateGenerators;
135  private CostFromRegionLoadFunction[] regionLoadFunctions;
136  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
137
138  // to save and report costs to JMX
139  private Double curOverallCost = 0d;
140  private Double[] tempFunctionCosts;
141  private Double[] curFunctionCosts;
142
143  // Keep locality based picker and cost function to alert them
144  // when new services are offered
145  private LocalityBasedCandidateGenerator localityCandidateGenerator;
146  private ServerLocalityCostFunction localityCost;
147  private RackLocalityCostFunction rackLocalityCost;
148  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
149  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
150  private boolean isByTable = false;
151  private TableName tableName = null;
152
153  /**
154   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
155   * default MetricsBalancer
156   */
157  public StochasticLoadBalancer() {
158    super(new MetricsStochasticBalancer());
159  }
160
161  @Override
162  public void onConfigurationChange(Configuration conf) {
163    setConf(conf);
164  }
165
166  @Override
167  public synchronized void setConf(Configuration conf) {
168    super.setConf(conf);
169    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
170    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
171    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
172    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
173
174    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
175    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
176    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
177    if (localityCandidateGenerator == null) {
178      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
179    }
180    localityCost = new ServerLocalityCostFunction(conf, services);
181    rackLocalityCost = new RackLocalityCostFunction(conf, services);
182
183    if (this.candidateGenerators == null) {
184      candidateGenerators = Lists.newArrayList();
185      candidateGenerators.add(new RandomCandidateGenerator());
186      candidateGenerators.add(new LoadCandidateGenerator());
187      candidateGenerators.add(localityCandidateGenerator);
188      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
189    }
190    regionLoadFunctions = new CostFromRegionLoadFunction[] {
191      new ReadRequestCostFunction(conf),
192      new WriteRequestCostFunction(conf),
193      new MemStoreSizeCostFunction(conf),
194      new StoreFileCostFunction(conf)
195    };
196    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
197    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
198    costFunctions = new CostFunction[]{
199      new RegionCountSkewCostFunction(conf),
200      new PrimaryRegionCountSkewCostFunction(conf),
201      new MoveCostFunction(conf),
202      localityCost,
203      rackLocalityCost,
204      new TableSkewCostFunction(conf),
205      regionReplicaHostCostFunction,
206      regionReplicaRackCostFunction,
207      regionLoadFunctions[0],
208      regionLoadFunctions[1],
209      regionLoadFunctions[2],
210      regionLoadFunctions[3],
211    };
212    curFunctionCosts= new Double[costFunctions.length];
213    tempFunctionCosts= new Double[costFunctions.length];
214    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
215        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
216  }
217
218  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
219    this.candidateGenerators = customCandidateGenerators;
220  }
221
222  @Override
223  protected void setSlop(Configuration conf) {
224    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
225  }
226
227  @Override
228  public synchronized void setClusterMetrics(ClusterMetrics st) {
229    super.setClusterMetrics(st);
230    updateRegionLoad();
231    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
232      cost.setClusterMetrics(st);
233    }
234
235    // update metrics size
236    try {
237      // by-table or ensemble mode
238      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
239      int functionsCount = getCostFunctionNames().length;
240
241      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
242    } catch (Exception e) {
243      LOG.error("failed to get the size of all tables", e);
244    }
245  }
246
247  /**
248   * Update the number of metrics that are reported to JMX
249   */
250  public void updateMetricsSize(int size) {
251    if (metricsBalancer instanceof MetricsStochasticBalancer) {
252        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
253    }
254  }
255
256  @Override
257  public synchronized void setMasterServices(MasterServices masterServices) {
258    super.setMasterServices(masterServices);
259    this.localityCost.setServices(masterServices);
260    this.rackLocalityCost.setServices(masterServices);
261    this.localityCandidateGenerator.setServices(masterServices);
262  }
263
264  @Override
265  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
266    regionReplicaHostCostFunction.init(c);
267    if (regionReplicaHostCostFunction.cost() > 0) return true;
268    regionReplicaRackCostFunction.init(c);
269    if (regionReplicaRackCostFunction.cost() > 0) return true;
270    return false;
271  }
272
273  @Override
274  protected boolean needsBalance(Cluster cluster) {
275    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
276    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
277      if (LOG.isDebugEnabled()) {
278        LOG.debug("Not running balancer because only " + cs.getNumServers()
279            + " active regionserver(s)");
280      }
281      return false;
282    }
283    if (areSomeRegionReplicasColocated(cluster)) {
284      return true;
285    }
286
287    double total = 0.0;
288    float sumMultiplier = 0.0f;
289    for (CostFunction c : costFunctions) {
290      float multiplier = c.getMultiplier();
291      if (multiplier <= 0) {
292        continue;
293      }
294      if (!c.isNeeded()) {
295        LOG.debug(c.getClass().getName() + " indicated that its cost should not be considered");
296        continue;
297      }
298      sumMultiplier += multiplier;
299      total += c.cost() * multiplier;
300    }
301
302    if (total <= 0 || sumMultiplier <= 0
303        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
304      if (LOG.isTraceEnabled()) {
305        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
306          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
307          + minCostNeedBalance);
308      }
309      return false;
310    }
311    return true;
312  }
313
314  @Override
315  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
316    List<RegionInfo>> clusterState) {
317    this.tableName = tableName;
318    return balanceCluster(clusterState);
319  }
320
321  @VisibleForTesting
322  Cluster.Action nextAction(Cluster cluster) {
323    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
324            .generate(cluster);
325  }
326
327  /**
328   * Given the cluster state this will try and approach an optimal balance. This
329   * should always approach the optimal state given enough steps.
330   */
331  @Override
332  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
333    List<RegionInfo>> clusterState) {
334    List<RegionPlan> plans = balanceMasterRegions(clusterState);
335    if (plans != null || clusterState == null || clusterState.size() <= 1) {
336      return plans;
337    }
338
339    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
340      if (clusterState.size() <= 2) {
341        return null;
342      }
343      clusterState = new HashMap<>(clusterState);
344      clusterState.remove(masterServerName);
345    }
346
347    // On clusters with lots of HFileLinks or lots of reference files,
348    // instantiating the storefile infos can be quite expensive.
349    // Allow turning this feature off if the locality cost is not going to
350    // be used in any computations.
351    RegionLocationFinder finder = null;
352    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
353        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
354      finder = this.regionFinder;
355    }
356
357    //The clusterState that is given to this method contains the state
358    //of all the regions in the table(s) (that's true today)
359    // Keep track of servers to iterate through them.
360    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
361
362    long startTime = EnvironmentEdgeManager.currentTime();
363
364    initCosts(cluster);
365
366    if (!needsBalance(cluster)) {
367      return null;
368    }
369
370    double currentCost = computeCost(cluster, Double.MAX_VALUE);
371    curOverallCost = currentCost;
372    for (int i = 0; i < this.curFunctionCosts.length; i++) {
373      curFunctionCosts[i] = tempFunctionCosts[i];
374    }
375    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
376        + functionCost());
377
378    double initCost = currentCost;
379    double newCost = currentCost;
380
381    long computedMaxSteps;
382    if (runMaxSteps) {
383      computedMaxSteps = Math.max(this.maxSteps,
384          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
385    } else {
386      computedMaxSteps = Math.min(this.maxSteps,
387          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
388    }
389    // Perform a stochastic walk to see if we can get a good fit.
390    long step;
391
392    for (step = 0; step < computedMaxSteps; step++) {
393      Cluster.Action action = nextAction(cluster);
394
395      if (action.type == Type.NULL) {
396        continue;
397      }
398
399      cluster.doAction(action);
400      updateCostsWithAction(cluster, action);
401
402      newCost = computeCost(cluster, currentCost);
403
404      // Should this be kept?
405      if (newCost < currentCost) {
406        currentCost = newCost;
407
408        // save for JMX
409        curOverallCost = currentCost;
410        for (int i = 0; i < this.curFunctionCosts.length; i++) {
411          curFunctionCosts[i] = tempFunctionCosts[i];
412        }
413      } else {
414        // Put things back the way they were before.
415        // TODO: undo by remembering old values
416        Action undoAction = action.undoAction();
417        cluster.doAction(undoAction);
418        updateCostsWithAction(cluster, undoAction);
419      }
420
421      if (EnvironmentEdgeManager.currentTime() - startTime >
422          maxRunningTime) {
423        break;
424      }
425    }
426    long endTime = EnvironmentEdgeManager.currentTime();
427
428    metricsBalancer.balanceCluster(endTime - startTime);
429
430    // update costs metrics
431    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
432    if (initCost > currentCost) {
433      plans = createRegionPlans(cluster);
434      if (LOG.isDebugEnabled()) {
435        LOG.debug("Finished computing new load balance plan.  Computation took "
436            + (endTime - startTime) + "ms to try " + step
437            + " different iterations.  Found a solution that moves "
438            + plans.size() + " regions; Going from a computed cost of "
439            + initCost + " to a new cost of " + currentCost);
440      }
441
442      return plans;
443    }
444    if (LOG.isDebugEnabled()) {
445      LOG.debug("Could not find a better load balance plan.  Tried "
446          + step + " different configurations in " + (endTime - startTime)
447          + "ms, and did not find anything with a computed cost less than " + initCost);
448    }
449    return null;
450  }
451
452  /**
453   * update costs to JMX
454   */
455  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
456    if (tableName == null) return;
457
458    // check if the metricsBalancer is MetricsStochasticBalancer before casting
459    if (metricsBalancer instanceof MetricsStochasticBalancer) {
460      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
461      // overall cost
462      balancer.updateStochasticCost(tableName.getNameAsString(),
463        "Overall", "Overall cost", overall);
464
465      // each cost function
466      for (int i = 0; i < costFunctions.length; i++) {
467        CostFunction costFunction = costFunctions[i];
468        String costFunctionName = costFunction.getClass().getSimpleName();
469        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
470        // TODO: cost function may need a specific description
471        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
472          "The percent of " + costFunctionName, costPercent);
473      }
474    }
475  }
476
477  private String functionCost() {
478    StringBuilder builder = new StringBuilder();
479    for (CostFunction c:costFunctions) {
480      builder.append(c.getClass().getSimpleName());
481      builder.append(" : (");
482      builder.append(c.getMultiplier());
483      builder.append(", ");
484      builder.append(c.cost());
485      builder.append("); ");
486    }
487    return builder.toString();
488  }
489
490  /**
491   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
492   * state.
493   *
494   * @param cluster The state of the cluster
495   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
496   */
497  private List<RegionPlan> createRegionPlans(Cluster cluster) {
498    List<RegionPlan> plans = new LinkedList<>();
499    for (int regionIndex = 0;
500         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
501      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
502      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
503
504      if (initialServerIndex != newServerIndex) {
505        RegionInfo region = cluster.regions[regionIndex];
506        ServerName initialServer = cluster.servers[initialServerIndex];
507        ServerName newServer = cluster.servers[newServerIndex];
508
509        if (LOG.isTraceEnabled()) {
510          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
511              + initialServer.getHostname() + " to " + newServer.getHostname());
512        }
513        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
514        plans.add(rp);
515      }
516    }
517    return plans;
518  }
519
520  /**
521   * Store the current region loads.
522   */
523  private synchronized void updateRegionLoad() {
524    // We create a new hashmap so that regions that are no longer there are removed.
525    // However we temporarily need the old loads so we can use them to keep the rolling average.
526    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
527    loads = new HashMap<>();
528
529    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
530      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
531        Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(regionName));
532        if (rLoads == null) {
533          // There was nothing there
534          rLoads = new ArrayDeque<>();
535        } else if (rLoads.size() >= numRegionLoadsToRemember) {
536          rLoads.remove();
537        }
538        rLoads.add(new BalancerRegionLoad(rm));
539        loads.put(Bytes.toString(regionName), rLoads);
540      });
541    });
542
543    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
544      cost.setLoads(loads);
545    }
546  }
547
548  protected void initCosts(Cluster cluster) {
549    for (CostFunction c:costFunctions) {
550      c.init(cluster);
551    }
552  }
553
554  protected void updateCostsWithAction(Cluster cluster, Action action) {
555    for (CostFunction c : costFunctions) {
556      c.postAction(action);
557    }
558  }
559
560  /**
561   * Get the names of the cost functions
562   */
563  public String[] getCostFunctionNames() {
564    if (costFunctions == null) return null;
565    String[] ret = new String[costFunctions.length];
566    for (int i = 0; i < costFunctions.length; i++) {
567      CostFunction c = costFunctions[i];
568      ret[i] = c.getClass().getSimpleName();
569    }
570
571    return ret;
572  }
573
574  /**
575   * This is the main cost function.  It will compute a cost associated with a proposed cluster
576   * state.  All different costs will be combined with their multipliers to produce a double cost.
577   *
578   * @param cluster The state of the cluster
579   * @param previousCost the previous cost. This is used as an early out.
580   * @return a double of a cost associated with the proposed cluster state.  This cost is an
581   *         aggregate of all individual cost functions.
582   */
583  protected double computeCost(Cluster cluster, double previousCost) {
584    double total = 0;
585
586    for (int i = 0; i < costFunctions.length; i++) {
587      CostFunction c = costFunctions[i];
588      this.tempFunctionCosts[i] = 0.0;
589
590      if (c.getMultiplier() <= 0) {
591        continue;
592      }
593
594      Float multiplier = c.getMultiplier();
595      Double cost = c.cost();
596
597      this.tempFunctionCosts[i] = multiplier*cost;
598      total += this.tempFunctionCosts[i];
599
600      if (total > previousCost) {
601        break;
602      }
603    }
604
605    return total;
606  }
607
608  /** Generates a candidate action to be applied to the cluster for cost function search */
609  abstract static class CandidateGenerator {
610    abstract Cluster.Action generate(Cluster cluster);
611
612    /**
613     * From a list of regions pick a random one. Null can be returned which
614     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
615     * rather than swap.
616     *
617     * @param cluster        The state of the cluster
618     * @param server         index of the server
619     * @param chanceOfNoSwap Chance that this will decide to try a move rather
620     *                       than a swap.
621     * @return a random {@link RegionInfo} or null if an asymmetrical move is
622     *         suggested.
623     */
624    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
625      // Check to see if this is just a move.
626      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
627        // signal a move only.
628        return -1;
629      }
630      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
631      return cluster.regionsPerServer[server][rand];
632
633    }
634    protected int pickRandomServer(Cluster cluster) {
635      if (cluster.numServers < 1) {
636        return -1;
637      }
638
639      return RANDOM.nextInt(cluster.numServers);
640    }
641
642    protected int pickRandomRack(Cluster cluster) {
643      if (cluster.numRacks < 1) {
644        return -1;
645      }
646
647      return RANDOM.nextInt(cluster.numRacks);
648    }
649
650    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
651      if (cluster.numServers < 2) {
652        return -1;
653      }
654      while (true) {
655        int otherServerIndex = pickRandomServer(cluster);
656        if (otherServerIndex != serverIndex) {
657          return otherServerIndex;
658        }
659      }
660    }
661
662    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
663      if (cluster.numRacks < 2) {
664        return -1;
665      }
666      while (true) {
667        int otherRackIndex = pickRandomRack(cluster);
668        if (otherRackIndex != rackIndex) {
669          return otherRackIndex;
670        }
671      }
672    }
673
674    protected Cluster.Action pickRandomRegions(Cluster cluster,
675                                                       int thisServer,
676                                                       int otherServer) {
677      if (thisServer < 0 || otherServer < 0) {
678        return Cluster.NullAction;
679      }
680
681      // Decide who is most likely to need another region
682      int thisRegionCount = cluster.getNumRegions(thisServer);
683      int otherRegionCount = cluster.getNumRegions(otherServer);
684
685      // Assign the chance based upon the above
686      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
687      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
688
689      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
690      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
691
692      return getAction(thisServer, thisRegion, otherServer, otherRegion);
693    }
694
695    protected Cluster.Action getAction(int fromServer, int fromRegion,
696        int toServer, int toRegion) {
697      if (fromServer < 0 || toServer < 0) {
698        return Cluster.NullAction;
699      }
700      if (fromRegion > 0 && toRegion > 0) {
701        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
702          toServer, toRegion);
703      } else if (fromRegion > 0) {
704        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
705      } else if (toRegion > 0) {
706        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
707      } else {
708        return Cluster.NullAction;
709      }
710    }
711
712    /**
713     * Returns a random iteration order of indexes of an array with size length
714     */
715    protected List<Integer> getRandomIterationOrder(int length) {
716      ArrayList<Integer> order = new ArrayList<>(length);
717      for (int i = 0; i < length; i++) {
718        order.add(i);
719      }
720      Collections.shuffle(order);
721      return order;
722    }
723  }
724
725  static class RandomCandidateGenerator extends CandidateGenerator {
726
727    @Override
728    Cluster.Action generate(Cluster cluster) {
729
730      int thisServer = pickRandomServer(cluster);
731
732      // Pick the other server
733      int otherServer = pickOtherRandomServer(cluster, thisServer);
734
735      return pickRandomRegions(cluster, thisServer, otherServer);
736    }
737  }
738
739  static class LoadCandidateGenerator extends CandidateGenerator {
740
741    @Override
742    Cluster.Action generate(Cluster cluster) {
743      cluster.sortServersByRegionCount();
744      int thisServer = pickMostLoadedServer(cluster, -1);
745      int otherServer = pickLeastLoadedServer(cluster, thisServer);
746
747      return pickRandomRegions(cluster, thisServer, otherServer);
748    }
749
750    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
751      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
752
753      int index = 0;
754      while (servers[index] == null || servers[index] == thisServer) {
755        index++;
756        if (index == servers.length) {
757          return -1;
758        }
759      }
760      return servers[index];
761    }
762
763    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
764      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
765
766      int index = servers.length - 1;
767      while (servers[index] == null || servers[index] == thisServer) {
768        index--;
769        if (index < 0) {
770          return -1;
771        }
772      }
773      return servers[index];
774    }
775  }
776
777  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
778
779    private MasterServices masterServices;
780
781    LocalityBasedCandidateGenerator(MasterServices masterServices) {
782      this.masterServices = masterServices;
783    }
784
785    @Override
786    Cluster.Action generate(Cluster cluster) {
787      if (this.masterServices == null) {
788        int thisServer = pickRandomServer(cluster);
789        // Pick the other server
790        int otherServer = pickOtherRandomServer(cluster, thisServer);
791        return pickRandomRegions(cluster, thisServer, otherServer);
792      }
793
794      // Randomly iterate through regions until you find one that is not on ideal host
795      for (int region : getRandomIterationOrder(cluster.numRegions)) {
796        int currentServer = cluster.regionIndexToServerIndex[region];
797        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
798          Optional<Action> potential = tryMoveOrSwap(
799              cluster,
800              currentServer,
801              region,
802              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
803          );
804          if (potential.isPresent()) {
805            return potential.get();
806          }
807        }
808      }
809      return Cluster.NullAction;
810    }
811
812    /**
813     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
814     * Returns empty optional if no move can be found
815     */
816    private Optional<Action> tryMoveOrSwap(Cluster cluster,
817                                           int fromServer,
818                                           int fromRegion,
819                                           int toServer) {
820      // Try move first. We know apriori fromRegion has the highest locality on toServer
821      if (cluster.serverHasTooFewRegions(toServer)) {
822        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
823      }
824
825      // Compare locality gain/loss from swapping fromRegion with regions on toServer
826      double fromRegionLocalityDelta =
827          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
828      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
829        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
830        double toRegionLocalityDelta =
831            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
832        // If locality would remain neutral or improve, attempt the swap
833        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
834          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
835        }
836      }
837
838      return Optional.absent();
839    }
840
841    private double getWeightedLocality(Cluster cluster, int region, int server) {
842      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
843    }
844
845    void setServices(MasterServices services) {
846      this.masterServices = services;
847    }
848  }
849
850  /**
851   * Generates candidates which moves the replicas out of the region server for
852   * co-hosted region replicas
853   */
854  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
855
856    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
857
858    /**
859     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
860     * (a group is a server, host or rack)
861     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
862     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
863     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
864     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
865     * @return a regionIndex for the selected primary or -1 if there is no co-locating
866     */
867    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
868        , int[] regionIndexToPrimaryIndex) {
869      int currentPrimary = -1;
870      int currentPrimaryIndex = -1;
871      int selectedPrimaryIndex = -1;
872      double currentLargestRandom = -1;
873      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
874      // ids for the regions hosted in server, a consecutive repetition means that replicas
875      // are co-hosted
876      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
877        int primary = j < primariesOfRegionsPerGroup.length
878            ? primariesOfRegionsPerGroup[j] : -1;
879        if (primary != currentPrimary) { // check for whether we see a new primary
880          int numReplicas = j - currentPrimaryIndex;
881          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
882            // decide to select this primary region id or not
883            double currentRandom = RANDOM.nextDouble();
884            // we don't know how many region replicas are co-hosted, we will randomly select one
885            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
886            if (currentRandom > currentLargestRandom) {
887              selectedPrimaryIndex = currentPrimary;
888              currentLargestRandom = currentRandom;
889            }
890          }
891          currentPrimary = primary;
892          currentPrimaryIndex = j;
893        }
894      }
895
896      // we have found the primary id for the region to move. Now find the actual regionIndex
897      // with the given primary, prefer to move the secondary region.
898      for (int j = 0; j < regionsPerGroup.length; j++) {
899        int regionIndex = regionsPerGroup[j];
900        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
901          // always move the secondary, not the primary
902          if (selectedPrimaryIndex != regionIndex) {
903            return regionIndex;
904          }
905        }
906      }
907      return -1;
908    }
909
910    @Override
911    Cluster.Action generate(Cluster cluster) {
912      int serverIndex = pickRandomServer(cluster);
913      if (cluster.numServers <= 1 || serverIndex == -1) {
914        return Cluster.NullAction;
915      }
916
917      int regionIndex = selectCoHostedRegionPerGroup(
918        cluster.primariesOfRegionsPerServer[serverIndex],
919        cluster.regionsPerServer[serverIndex],
920        cluster.regionIndexToPrimaryIndex);
921
922      // if there are no pairs of region replicas co-hosted, default to random generator
923      if (regionIndex == -1) {
924        // default to randompicker
925        return randomGenerator.generate(cluster);
926      }
927
928      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
929      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
930      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
931    }
932  }
933
934  /**
935   * Generates candidates which moves the replicas out of the rack for
936   * co-hosted region replicas in the same rack
937   */
938  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
939    @Override
940    Cluster.Action generate(Cluster cluster) {
941      int rackIndex = pickRandomRack(cluster);
942      if (cluster.numRacks <= 1 || rackIndex == -1) {
943        return super.generate(cluster);
944      }
945
946      int regionIndex = selectCoHostedRegionPerGroup(
947        cluster.primariesOfRegionsPerRack[rackIndex],
948        cluster.regionsPerRack[rackIndex],
949        cluster.regionIndexToPrimaryIndex);
950
951      // if there are no pairs of region replicas co-hosted, default to random generator
952      if (regionIndex == -1) {
953        // default to randompicker
954        return randomGenerator.generate(cluster);
955      }
956
957      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
958      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
959
960      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
961      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
962      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
963      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
964    }
965  }
966
967  /**
968   * Base class of StochasticLoadBalancer's Cost Functions.
969   */
970  abstract static class CostFunction {
971
972    private float multiplier = 0;
973
974    protected Cluster cluster;
975
976    CostFunction(Configuration c) {
977    }
978
979    boolean isNeeded() {
980      return true;
981    }
982    float getMultiplier() {
983      return multiplier;
984    }
985
986    void setMultiplier(float m) {
987      this.multiplier = m;
988    }
989
990    /** Called once per LB invocation to give the cost function
991     * to initialize it's state, and perform any costly calculation.
992     */
993    void init(Cluster cluster) {
994      this.cluster = cluster;
995    }
996
997    /** Called once per cluster Action to give the cost function
998     * an opportunity to update it's state. postAction() is always
999     * called at least once before cost() is called with the cluster
1000     * that this action is performed on. */
1001    void postAction(Action action) {
1002      switch (action.type) {
1003      case NULL: break;
1004      case ASSIGN_REGION:
1005        AssignRegionAction ar = (AssignRegionAction) action;
1006        regionMoved(ar.region, -1, ar.server);
1007        break;
1008      case MOVE_REGION:
1009        MoveRegionAction mra = (MoveRegionAction) action;
1010        regionMoved(mra.region, mra.fromServer, mra.toServer);
1011        break;
1012      case SWAP_REGIONS:
1013        SwapRegionsAction a = (SwapRegionsAction) action;
1014        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1015        regionMoved(a.toRegion, a.toServer, a.fromServer);
1016        break;
1017      default:
1018        throw new RuntimeException("Uknown action:" + action.type);
1019      }
1020    }
1021
1022    protected void regionMoved(int region, int oldServer, int newServer) {
1023    }
1024
1025    abstract double cost();
1026
1027    /**
1028     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1029     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1030     * possible is all of the elements in one region server and the rest having 0.
1031     *
1032     * @param stats the costs
1033     * @return a scaled set of costs.
1034     */
1035    protected double costFromArray(double[] stats) {
1036      double totalCost = 0;
1037      double total = getSum(stats);
1038
1039      double count = stats.length;
1040      double mean = total/count;
1041
1042      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1043      // a zero sum cost for this to make sense.
1044      double max = ((count - 1) * mean) + (total - mean);
1045
1046      // It's possible that there aren't enough regions to go around
1047      double min;
1048      if (count > total) {
1049        min = ((count - total) * mean) + ((1 - mean) * total);
1050      } else {
1051        // Some will have 1 more than everything else.
1052        int numHigh = (int) (total - (Math.floor(mean) * count));
1053        int numLow = (int) (count - numHigh);
1054
1055        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1056
1057      }
1058      min = Math.max(0, min);
1059      for (int i=0; i<stats.length; i++) {
1060        double n = stats[i];
1061        double diff = Math.abs(mean - n);
1062        totalCost += diff;
1063      }
1064
1065      double scaled =  scale(min, max, totalCost);
1066      return scaled;
1067    }
1068
1069    private double getSum(double[] stats) {
1070      double total = 0;
1071      for(double s:stats) {
1072        total += s;
1073      }
1074      return total;
1075    }
1076
1077    /**
1078     * Scale the value between 0 and 1.
1079     *
1080     * @param min   Min value
1081     * @param max   The Max value
1082     * @param value The value to be scaled.
1083     * @return The scaled value.
1084     */
1085    protected double scale(double min, double max, double value) {
1086      if (max <= min || value <= min) {
1087        return 0;
1088      }
1089      if ((max - min) == 0) return 0;
1090
1091      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1092    }
1093  }
1094
1095  /**
1096   * Given the starting state of the regions and a potential ending state
1097   * compute cost based upon the number of regions that have moved.
1098   */
1099  static class MoveCostFunction extends CostFunction {
1100    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1101    private static final String MAX_MOVES_PERCENT_KEY =
1102        "hbase.master.balancer.stochastic.maxMovePercent";
1103    private static final float DEFAULT_MOVE_COST = 7;
1104    private static final int DEFAULT_MAX_MOVES = 600;
1105    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1106
1107    private final float maxMovesPercent;
1108
1109    MoveCostFunction(Configuration conf) {
1110      super(conf);
1111
1112      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1113      // that large benefits are need to overcome the cost of a move.
1114      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1115      // What percent of the number of regions a single run of the balancer can move.
1116      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1117    }
1118
1119    @Override
1120    double cost() {
1121      // Try and size the max number of Moves, but always be prepared to move some.
1122      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1123          DEFAULT_MAX_MOVES);
1124
1125      double moveCost = cluster.numMovedRegions;
1126
1127      // Don't let this single balance move more than the max moves.
1128      // This allows better scaling to accurately represent the actual cost of a move.
1129      if (moveCost > maxMoves) {
1130        return 1000000;   // return a number much greater than any of the other cost
1131      }
1132
1133      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1134    }
1135  }
1136
1137  /**
1138   * Compute the cost of a potential cluster state from skew in number of
1139   * regions on a cluster.
1140   */
1141  static class RegionCountSkewCostFunction extends CostFunction {
1142    private static final String REGION_COUNT_SKEW_COST_KEY =
1143        "hbase.master.balancer.stochastic.regionCountCost";
1144    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1145
1146    private double[] stats = null;
1147
1148    RegionCountSkewCostFunction(Configuration conf) {
1149      super(conf);
1150      // Load multiplier should be the greatest as it is the most general way to balance data.
1151      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1152    }
1153
1154    @Override
1155    double cost() {
1156      if (stats == null || stats.length != cluster.numServers) {
1157        stats = new double[cluster.numServers];
1158      }
1159
1160      for (int i =0; i < cluster.numServers; i++) {
1161        stats[i] = cluster.regionsPerServer[i].length;
1162      }
1163
1164      return costFromArray(stats);
1165    }
1166  }
1167
1168  /**
1169   * Compute the cost of a potential cluster state from skew in number of
1170   * primary regions on a cluster.
1171   */
1172  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1173    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1174        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1175    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1176
1177    private double[] stats = null;
1178
1179    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1180      super(conf);
1181      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1182      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1183        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1184    }
1185
1186    @Override
1187    double cost() {
1188      if (!cluster.hasRegionReplicas) {
1189        return 0;
1190      }
1191      if (stats == null || stats.length != cluster.numServers) {
1192        stats = new double[cluster.numServers];
1193      }
1194
1195      for (int i = 0; i < cluster.numServers; i++) {
1196        stats[i] = 0;
1197        for (int regionIdx : cluster.regionsPerServer[i]) {
1198          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1199            stats[i]++;
1200          }
1201        }
1202      }
1203
1204      return costFromArray(stats);
1205    }
1206  }
1207
1208  /**
1209   * Compute the cost of a potential cluster configuration based upon how evenly
1210   * distributed tables are.
1211   */
1212  static class TableSkewCostFunction extends CostFunction {
1213
1214    private static final String TABLE_SKEW_COST_KEY =
1215        "hbase.master.balancer.stochastic.tableSkewCost";
1216    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1217
1218    TableSkewCostFunction(Configuration conf) {
1219      super(conf);
1220      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1221    }
1222
1223    @Override
1224    double cost() {
1225      double max = cluster.numRegions;
1226      double min = ((double) cluster.numRegions) / cluster.numServers;
1227      double value = 0;
1228
1229      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1230        value += cluster.numMaxRegionsPerTable[i];
1231      }
1232
1233      return scale(min, max, value);
1234    }
1235  }
1236
1237  /**
1238   * Compute a cost of a potential cluster configuration based upon where
1239   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1240   */
1241  static abstract class LocalityBasedCostFunction extends CostFunction {
1242
1243    private final LocalityType type;
1244
1245    private double bestLocality; // best case locality across cluster weighted by local data size
1246    private double locality; // current locality across cluster weighted by local data size
1247
1248    private MasterServices services;
1249
1250    LocalityBasedCostFunction(Configuration conf,
1251                              MasterServices srv,
1252                              LocalityType type,
1253                              String localityCostKey,
1254                              float defaultLocalityCost) {
1255      super(conf);
1256      this.type = type;
1257      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1258      this.services = srv;
1259      this.locality = 0.0;
1260      this.bestLocality = 0.0;
1261    }
1262
1263    /**
1264     * Maps region to the current entity (server or rack) on which it is stored
1265     */
1266    abstract int regionIndexToEntityIndex(int region);
1267
1268    public void setServices(MasterServices srvc) {
1269      this.services = srvc;
1270    }
1271
1272    @Override
1273    void init(Cluster cluster) {
1274      super.init(cluster);
1275      locality = 0.0;
1276      bestLocality = 0.0;
1277
1278      // If no master, no computation will work, so assume 0 cost
1279      if (this.services == null) {
1280        return;
1281      }
1282
1283      for (int region = 0; region < cluster.numRegions; region++) {
1284        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1285        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1286      }
1287
1288      // We normalize locality to be a score between 0 and 1.0 representing how good it
1289      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1290      // (and the cost is 0)
1291      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1292    }
1293
1294    @Override
1295    protected void regionMoved(int region, int oldServer, int newServer) {
1296      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1297      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1298      if (this.services == null) {
1299        return;
1300      }
1301      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1302      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1303      locality += normalizedDelta;
1304    }
1305
1306    @Override
1307    double cost() {
1308      return 1 - locality;
1309    }
1310
1311    private int getMostLocalEntityForRegion(int region) {
1312      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1313    }
1314
1315    private double getWeightedLocality(int region, int entity) {
1316      return cluster.getOrComputeWeightedLocality(region, entity, type);
1317    }
1318
1319  }
1320
1321  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1322
1323    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1324    private static final float DEFAULT_LOCALITY_COST = 25;
1325
1326    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1327      super(
1328          conf,
1329          srv,
1330          LocalityType.SERVER,
1331          LOCALITY_COST_KEY,
1332          DEFAULT_LOCALITY_COST
1333      );
1334    }
1335
1336    @Override
1337    int regionIndexToEntityIndex(int region) {
1338      return cluster.regionIndexToServerIndex[region];
1339    }
1340  }
1341
1342  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1343
1344    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1345    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1346
1347    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1348      super(
1349          conf,
1350          services,
1351          LocalityType.RACK,
1352          RACK_LOCALITY_COST_KEY,
1353          DEFAULT_RACK_LOCALITY_COST
1354      );
1355    }
1356
1357    @Override
1358    int regionIndexToEntityIndex(int region) {
1359      return cluster.getRackForRegion(region);
1360    }
1361  }
1362
1363  /**
1364   * Base class the allows writing costs functions from rolling average of some
1365   * number from RegionLoad.
1366   */
1367  abstract static class CostFromRegionLoadFunction extends CostFunction {
1368
1369    private ClusterMetrics clusterStatus = null;
1370    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1371    private double[] stats = null;
1372    CostFromRegionLoadFunction(Configuration conf) {
1373      super(conf);
1374    }
1375
1376    void setClusterMetrics(ClusterMetrics status) {
1377      this.clusterStatus = status;
1378    }
1379
1380    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1381      this.loads = l;
1382    }
1383
1384    @Override
1385    double cost() {
1386      if (clusterStatus == null || loads == null) {
1387        return 0;
1388      }
1389
1390      if (stats == null || stats.length != cluster.numServers) {
1391        stats = new double[cluster.numServers];
1392      }
1393
1394      for (int i =0; i < stats.length; i++) {
1395        //Cost this server has from RegionLoad
1396        long cost = 0;
1397
1398        // for every region on this server get the rl
1399        for(int regionIndex:cluster.regionsPerServer[i]) {
1400          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1401
1402          // Now if we found a region load get the type of cost that was requested.
1403          if (regionLoadList != null) {
1404            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1405          }
1406        }
1407
1408        // Add the total cost to the stats.
1409        stats[i] = cost;
1410      }
1411
1412      // Now return the scaled cost from data held in the stats object.
1413      return costFromArray(stats);
1414    }
1415
1416    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1417      double cost = 0;
1418      for (BalancerRegionLoad rl : regionLoadList) {
1419        cost += getCostFromRl(rl);
1420      }
1421      return cost / regionLoadList.size();
1422    }
1423
1424    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1425  }
1426
1427  /**
1428   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1429   * We do not compare about the actual rate in requests per second but rather the rate relative
1430   * to the rest of the regions.
1431   */
1432  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1433
1434    CostFromRegionLoadAsRateFunction(Configuration conf) {
1435      super(conf);
1436    }
1437
1438    @Override
1439    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1440      double cost = 0;
1441      double previous = 0;
1442      boolean isFirst = true;
1443      for (BalancerRegionLoad rl : regionLoadList) {
1444        double current = getCostFromRl(rl);
1445        if (isFirst) {
1446          isFirst = false;
1447        } else {
1448          cost += current - previous;
1449        }
1450        previous = current;
1451      }
1452      return Math.max(0, cost / (regionLoadList.size() - 1));
1453    }
1454  }
1455
1456  /**
1457   * Compute the cost of total number of read requests  The more unbalanced the higher the
1458   * computed cost will be.  This uses a rolling average of regionload.
1459   */
1460
1461  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1462
1463    private static final String READ_REQUEST_COST_KEY =
1464        "hbase.master.balancer.stochastic.readRequestCost";
1465    private static final float DEFAULT_READ_REQUEST_COST = 5;
1466
1467    ReadRequestCostFunction(Configuration conf) {
1468      super(conf);
1469      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1470    }
1471
1472    @Override
1473    protected double getCostFromRl(BalancerRegionLoad rl) {
1474      return rl.getReadRequestsCount();
1475    }
1476  }
1477
1478  /**
1479   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1480   * computed cost will be.  This uses a rolling average of regionload.
1481   */
1482  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1483
1484    private static final String WRITE_REQUEST_COST_KEY =
1485        "hbase.master.balancer.stochastic.writeRequestCost";
1486    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1487
1488    WriteRequestCostFunction(Configuration conf) {
1489      super(conf);
1490      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1491    }
1492
1493    @Override
1494    protected double getCostFromRl(BalancerRegionLoad rl) {
1495      return rl.getWriteRequestsCount();
1496    }
1497  }
1498
1499  /**
1500   * A cost function for region replicas. We give a very high cost to hosting
1501   * replicas of the same region in the same host. We do not prevent the case
1502   * though, since if numReplicas > numRegionServers, we still want to keep the
1503   * replica open.
1504   */
1505  static class RegionReplicaHostCostFunction extends CostFunction {
1506    private static final String REGION_REPLICA_HOST_COST_KEY =
1507        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1508    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1509
1510    long maxCost = 0;
1511    long[] costsPerGroup; // group is either server, host or rack
1512    int[][] primariesOfRegionsPerGroup;
1513
1514    public RegionReplicaHostCostFunction(Configuration conf) {
1515      super(conf);
1516      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1517        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1518    }
1519
1520    @Override
1521    void init(Cluster cluster) {
1522      super.init(cluster);
1523      // max cost is the case where every region replica is hosted together regardless of host
1524      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1525      costsPerGroup = new long[cluster.numHosts];
1526      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1527          ? cluster.primariesOfRegionsPerHost
1528          : cluster.primariesOfRegionsPerServer;
1529      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1530        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1531      }
1532    }
1533
1534    long getMaxCost(Cluster cluster) {
1535      if (!cluster.hasRegionReplicas) {
1536        return 0; // short circuit
1537      }
1538      // max cost is the case where every region replica is hosted together regardless of host
1539      int[] primariesOfRegions = new int[cluster.numRegions];
1540      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1541          cluster.regions.length);
1542
1543      Arrays.sort(primariesOfRegions);
1544
1545      // compute numReplicas from the sorted array
1546      return costPerGroup(primariesOfRegions);
1547    }
1548
1549    @Override
1550    boolean isNeeded() {
1551      return cluster.hasRegionReplicas;
1552    }
1553
1554    @Override
1555    double cost() {
1556      if (maxCost <= 0) {
1557        return 0;
1558      }
1559
1560      long totalCost = 0;
1561      for (int i = 0 ; i < costsPerGroup.length; i++) {
1562        totalCost += costsPerGroup[i];
1563      }
1564      return scale(0, maxCost, totalCost);
1565    }
1566
1567    /**
1568     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1569     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1570     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1571     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1572     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1573     * @return a sum of numReplicas-1 squared for each primary region in the group.
1574     */
1575    protected long costPerGroup(int[] primariesOfRegions) {
1576      long cost = 0;
1577      int currentPrimary = -1;
1578      int currentPrimaryIndex = -1;
1579      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1580      // sharing the same primary will have consecutive numbers in the array.
1581      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1582        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1583        if (primary != currentPrimary) { // we see a new primary
1584          int numReplicas = j - currentPrimaryIndex;
1585          // square the cost
1586          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1587            cost += (numReplicas - 1) * (numReplicas - 1);
1588          }
1589          currentPrimary = primary;
1590          currentPrimaryIndex = j;
1591        }
1592      }
1593
1594      return cost;
1595    }
1596
1597    @Override
1598    protected void regionMoved(int region, int oldServer, int newServer) {
1599      if (maxCost <= 0) {
1600        return; // no need to compute
1601      }
1602      if (cluster.multiServersPerHost) {
1603        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1604        int newHost = cluster.serverIndexToHostIndex[newServer];
1605        if (newHost != oldHost) {
1606          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1607          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1608        }
1609      } else {
1610        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1611        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1612      }
1613    }
1614  }
1615
1616  /**
1617   * A cost function for region replicas for the rack distribution. We give a relatively high
1618   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1619   * though.
1620   */
1621  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1622    private static final String REGION_REPLICA_RACK_COST_KEY =
1623        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1624    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1625
1626    public RegionReplicaRackCostFunction(Configuration conf) {
1627      super(conf);
1628      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1629        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1630    }
1631
1632    @Override
1633    void init(Cluster cluster) {
1634      this.cluster = cluster;
1635      if (cluster.numRacks <= 1) {
1636        maxCost = 0;
1637        return; // disabled for 1 rack
1638      }
1639      // max cost is the case where every region replica is hosted together regardless of rack
1640      maxCost = getMaxCost(cluster);
1641      costsPerGroup = new long[cluster.numRacks];
1642      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1643        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1644      }
1645    }
1646
1647    @Override
1648    protected void regionMoved(int region, int oldServer, int newServer) {
1649      if (maxCost <= 0) {
1650        return; // no need to compute
1651      }
1652      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1653      int newRack = cluster.serverIndexToRackIndex[newServer];
1654      if (newRack != oldRack) {
1655        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1656        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1657      }
1658    }
1659  }
1660
1661  /**
1662   * Compute the cost of total memstore size.  The more unbalanced the higher the
1663   * computed cost will be.  This uses a rolling average of regionload.
1664   */
1665  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1666
1667    private static final String MEMSTORE_SIZE_COST_KEY =
1668        "hbase.master.balancer.stochastic.memstoreSizeCost";
1669    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1670
1671    MemStoreSizeCostFunction(Configuration conf) {
1672      super(conf);
1673      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1674    }
1675
1676    @Override
1677    protected double getCostFromRl(BalancerRegionLoad rl) {
1678      return rl.getMemStoreSizeMB();
1679    }
1680  }
1681  /**
1682   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1683   * computed cost will be.  This uses a rolling average of regionload.
1684   */
1685  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1686
1687    private static final String STOREFILE_SIZE_COST_KEY =
1688        "hbase.master.balancer.stochastic.storefileSizeCost";
1689    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1690
1691    StoreFileCostFunction(Configuration conf) {
1692      super(conf);
1693      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1694    }
1695
1696    @Override
1697    protected double getCostFromRl(BalancerRegionLoad rl) {
1698      return rl.getStorefileSizeMB();
1699    }
1700  }
1701
1702  /**
1703   * A helper function to compose the attribute name from tablename and costfunction name
1704   */
1705  public static String composeAttributeName(String tableName, String costFunctionName) {
1706    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1707  }
1708}