001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Optional;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057
058/**
059 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
060 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
061 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 *
070 *
071 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
072 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
073 * scaled by their respective multipliers:</p>
074 *
075 * <ul>
076 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
077 *   <li>hbase.master.balancer.stochastic.moveCost</li>
078 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
079 *   <li>hbase.master.balancer.stochastic.localityCost</li>
080 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
081 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
082 * </ul>
083 *
084 * <p>In addition to the above configurations, the balancer can be tuned by the following
085 * configuration values:</p>
086 * <ul>
087 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
088 *   controls what the max number of regions that can be moved in a single invocation of this
089 *   balancer.</li>
090 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
091 *   regions is multiplied to try and get the number of times the balancer will
092 *   mutate all servers.</li>
093 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
094 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
095 *   value and the above computation.</li>
096 * </ul>
097 *
098 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
099 * so that the balancer gets the full picture of all loads on the cluster.</p>
100 */
101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
102@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
103  justification="Complaint is about costFunctions not being synchronized; not end of the world")
104public class StochasticLoadBalancer extends BaseLoadBalancer {
105
106  protected static final String STEPS_PER_REGION_KEY =
107      "hbase.master.balancer.stochastic.stepsPerRegion";
108  protected static final String MAX_STEPS_KEY =
109      "hbase.master.balancer.stochastic.maxSteps";
110  protected static final String RUN_MAX_STEPS_KEY =
111      "hbase.master.balancer.stochastic.runMaxSteps";
112  protected static final String MAX_RUNNING_TIME_KEY =
113      "hbase.master.balancer.stochastic.maxRunningTime";
114  protected static final String KEEP_REGION_LOADS =
115      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
116  private static final String TABLE_FUNCTION_SEP = "_";
117  protected static final String MIN_COST_NEED_BALANCE_KEY =
118      "hbase.master.balancer.stochastic.minCostNeedBalance";
119
120  protected static final Random RANDOM = new Random(System.currentTimeMillis());
121  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
122
123  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
124
125  // values are defaults
126  private int maxSteps = 1000000;
127  private boolean runMaxSteps = false;
128  private int stepsPerRegion = 800;
129  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
130  private int numRegionLoadsToRemember = 15;
131  private float minCostNeedBalance = 0.05f;
132
133  private List<CandidateGenerator> candidateGenerators;
134  private CostFromRegionLoadFunction[] regionLoadFunctions;
135  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
136
137  // to save and report costs to JMX
138  private Double curOverallCost = 0d;
139  private Double[] tempFunctionCosts;
140  private Double[] curFunctionCosts;
141
142  // Keep locality based picker and cost function to alert them
143  // when new services are offered
144  private LocalityBasedCandidateGenerator localityCandidateGenerator;
145  private ServerLocalityCostFunction localityCost;
146  private RackLocalityCostFunction rackLocalityCost;
147  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
148  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
149  private boolean isByTable = false;
150  private TableName tableName = null;
151
152  /**
153   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
154   * default MetricsBalancer
155   */
156  public StochasticLoadBalancer() {
157    super(new MetricsStochasticBalancer());
158  }
159
160  @Override
161  public void onConfigurationChange(Configuration conf) {
162    setConf(conf);
163  }
164
165  @Override
166  public synchronized void setConf(Configuration conf) {
167    super.setConf(conf);
168    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
169    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
170    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
171    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
172
173    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
174    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
175    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
176    if (localityCandidateGenerator == null) {
177      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
178    }
179    localityCost = new ServerLocalityCostFunction(conf, services);
180    rackLocalityCost = new RackLocalityCostFunction(conf, services);
181
182    if (this.candidateGenerators == null) {
183      candidateGenerators = Lists.newArrayList();
184      candidateGenerators.add(new RandomCandidateGenerator());
185      candidateGenerators.add(new LoadCandidateGenerator());
186      candidateGenerators.add(localityCandidateGenerator);
187      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
188    }
189    regionLoadFunctions = new CostFromRegionLoadFunction[] {
190      new ReadRequestCostFunction(conf),
191      new CPRequestCostFunction(conf),
192      new WriteRequestCostFunction(conf),
193      new MemStoreSizeCostFunction(conf),
194      new StoreFileCostFunction(conf)
195    };
196    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
197    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
198    costFunctions = new CostFunction[]{
199      new RegionCountSkewCostFunction(conf),
200      new PrimaryRegionCountSkewCostFunction(conf),
201      new MoveCostFunction(conf),
202      localityCost,
203      rackLocalityCost,
204      new TableSkewCostFunction(conf),
205      regionReplicaHostCostFunction,
206      regionReplicaRackCostFunction,
207      regionLoadFunctions[0],
208      regionLoadFunctions[1],
209      regionLoadFunctions[2],
210      regionLoadFunctions[3],
211      regionLoadFunctions[4]
212    };
213    curFunctionCosts= new Double[costFunctions.length];
214    tempFunctionCosts= new Double[costFunctions.length];
215    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
216        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
217  }
218
219  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
220    this.candidateGenerators = customCandidateGenerators;
221  }
222
223  @Override
224  protected void setSlop(Configuration conf) {
225    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
226  }
227
228  @Override
229  public synchronized void setClusterMetrics(ClusterMetrics st) {
230    super.setClusterMetrics(st);
231    updateRegionLoad();
232    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
233      cost.setClusterMetrics(st);
234    }
235
236    // update metrics size
237    try {
238      // by-table or ensemble mode
239      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
240      int functionsCount = getCostFunctionNames().length;
241
242      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
243    } catch (Exception e) {
244      LOG.error("failed to get the size of all tables", e);
245    }
246  }
247
248  /**
249   * Update the number of metrics that are reported to JMX
250   */
251  public void updateMetricsSize(int size) {
252    if (metricsBalancer instanceof MetricsStochasticBalancer) {
253        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
254    }
255  }
256
257  @Override
258  public synchronized void setMasterServices(MasterServices masterServices) {
259    super.setMasterServices(masterServices);
260    this.localityCost.setServices(masterServices);
261    this.rackLocalityCost.setServices(masterServices);
262    this.localityCandidateGenerator.setServices(masterServices);
263  }
264
265  @Override
266  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
267    regionReplicaHostCostFunction.init(c);
268    if (regionReplicaHostCostFunction.cost() > 0) return true;
269    regionReplicaRackCostFunction.init(c);
270    if (regionReplicaRackCostFunction.cost() > 0) return true;
271    return false;
272  }
273
274  @Override
275  protected boolean needsBalance(Cluster cluster) {
276    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
277    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
278      if (LOG.isDebugEnabled()) {
279        LOG.debug("Not running balancer because only " + cs.getNumServers()
280            + " active regionserver(s)");
281      }
282      return false;
283    }
284    if (areSomeRegionReplicasColocated(cluster)) {
285      return true;
286    }
287
288    double total = 0.0;
289    float sumMultiplier = 0.0f;
290    for (CostFunction c : costFunctions) {
291      float multiplier = c.getMultiplier();
292      if (multiplier <= 0) {
293        continue;
294      }
295      if (!c.isNeeded()) {
296        LOG.debug("{} not needed", c.getClass().getSimpleName());
297        continue;
298      }
299      sumMultiplier += multiplier;
300      total += c.cost() * multiplier;
301    }
302
303    if (total <= 0 || sumMultiplier <= 0
304        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
305      if (LOG.isTraceEnabled()) {
306        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
307          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
308          + minCostNeedBalance);
309      }
310      return false;
311    }
312    return true;
313  }
314
315  @Override
316  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
317    List<RegionInfo>> clusterState) {
318    this.tableName = tableName;
319    return balanceCluster(clusterState);
320  }
321
322  @VisibleForTesting
323  Cluster.Action nextAction(Cluster cluster) {
324    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
325            .generate(cluster);
326  }
327
328  /**
329   * Given the cluster state this will try and approach an optimal balance. This
330   * should always approach the optimal state given enough steps.
331   */
332  @Override
333  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
334    List<RegionInfo>> clusterState) {
335    List<RegionPlan> plans = balanceMasterRegions(clusterState);
336    if (plans != null || clusterState == null || clusterState.size() <= 1) {
337      return plans;
338    }
339
340    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
341      if (clusterState.size() <= 2) {
342        return null;
343      }
344      clusterState = new HashMap<>(clusterState);
345      clusterState.remove(masterServerName);
346    }
347
348    // On clusters with lots of HFileLinks or lots of reference files,
349    // instantiating the storefile infos can be quite expensive.
350    // Allow turning this feature off if the locality cost is not going to
351    // be used in any computations.
352    RegionLocationFinder finder = null;
353    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
354        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
355      finder = this.regionFinder;
356    }
357
358    //The clusterState that is given to this method contains the state
359    //of all the regions in the table(s) (that's true today)
360    // Keep track of servers to iterate through them.
361    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
362
363    long startTime = EnvironmentEdgeManager.currentTime();
364
365    initCosts(cluster);
366
367    if (!needsBalance(cluster)) {
368      return null;
369    }
370
371    double currentCost = computeCost(cluster, Double.MAX_VALUE);
372    curOverallCost = currentCost;
373    for (int i = 0; i < this.curFunctionCosts.length; i++) {
374      curFunctionCosts[i] = tempFunctionCosts[i];
375    }
376    double initCost = currentCost;
377    double newCost = currentCost;
378
379    long computedMaxSteps;
380    if (runMaxSteps) {
381      computedMaxSteps = Math.max(this.maxSteps,
382          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
383    } else {
384      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
385          (long)cluster.numServers;
386      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
387      if (calculatedMaxSteps > maxSteps) {
388        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
389            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
390            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
391            + "(This config change does not require service restart)", calculatedMaxSteps,
392            maxRunningTime);
393      }
394    }
395    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
396        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
397
398    // Perform a stochastic walk to see if we can get a good fit.
399    long step;
400
401    for (step = 0; step < computedMaxSteps; step++) {
402      Cluster.Action action = nextAction(cluster);
403
404      if (action.type == Type.NULL) {
405        continue;
406      }
407
408      cluster.doAction(action);
409      updateCostsWithAction(cluster, action);
410
411      newCost = computeCost(cluster, currentCost);
412
413      // Should this be kept?
414      if (newCost < currentCost) {
415        currentCost = newCost;
416
417        // save for JMX
418        curOverallCost = currentCost;
419        for (int i = 0; i < this.curFunctionCosts.length; i++) {
420          curFunctionCosts[i] = tempFunctionCosts[i];
421        }
422      } else {
423        // Put things back the way they were before.
424        // TODO: undo by remembering old values
425        Action undoAction = action.undoAction();
426        cluster.doAction(undoAction);
427        updateCostsWithAction(cluster, undoAction);
428      }
429
430      if (EnvironmentEdgeManager.currentTime() - startTime >
431          maxRunningTime) {
432        break;
433      }
434    }
435    long endTime = EnvironmentEdgeManager.currentTime();
436
437    metricsBalancer.balanceCluster(endTime - startTime);
438
439    // update costs metrics
440    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
441    if (initCost > currentCost) {
442      plans = createRegionPlans(cluster);
443      LOG.info("Finished computing new load balance plan. Computation took {}" +
444        " to try {} different iterations.  Found a solution that moves " +
445        "{} regions; Going from a computed cost of {}" +
446        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
447        step, plans.size(), initCost, currentCost);
448      return plans;
449    }
450    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
451      "{}, and did not find anything with a computed cost less than {}", step,
452      java.time.Duration.ofMillis(endTime - startTime), initCost);
453    return null;
454  }
455
456  /**
457   * update costs to JMX
458   */
459  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
460    if (tableName == null) return;
461
462    // check if the metricsBalancer is MetricsStochasticBalancer before casting
463    if (metricsBalancer instanceof MetricsStochasticBalancer) {
464      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
465      // overall cost
466      balancer.updateStochasticCost(tableName.getNameAsString(),
467        "Overall", "Overall cost", overall);
468
469      // each cost function
470      for (int i = 0; i < costFunctions.length; i++) {
471        CostFunction costFunction = costFunctions[i];
472        String costFunctionName = costFunction.getClass().getSimpleName();
473        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
474        // TODO: cost function may need a specific description
475        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
476          "The percent of " + costFunctionName, costPercent);
477      }
478    }
479  }
480
481  private String functionCost() {
482    StringBuilder builder = new StringBuilder();
483    for (CostFunction c:costFunctions) {
484      builder.append(c.getClass().getSimpleName());
485      builder.append(" : (");
486      builder.append(c.getMultiplier());
487      builder.append(", ");
488      builder.append(c.cost());
489      builder.append("); ");
490    }
491    return builder.toString();
492  }
493
494  /**
495   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
496   * state.
497   *
498   * @param cluster The state of the cluster
499   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
500   */
501  private List<RegionPlan> createRegionPlans(Cluster cluster) {
502    List<RegionPlan> plans = new LinkedList<>();
503    for (int regionIndex = 0;
504         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
505      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
506      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
507
508      if (initialServerIndex != newServerIndex) {
509        RegionInfo region = cluster.regions[regionIndex];
510        ServerName initialServer = cluster.servers[initialServerIndex];
511        ServerName newServer = cluster.servers[newServerIndex];
512
513        if (LOG.isTraceEnabled()) {
514          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
515              + initialServer.getHostname() + " to " + newServer.getHostname());
516        }
517        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
518        plans.add(rp);
519      }
520    }
521    return plans;
522  }
523
524  /**
525   * Store the current region loads.
526   */
527  private synchronized void updateRegionLoad() {
528    // We create a new hashmap so that regions that are no longer there are removed.
529    // However we temporarily need the old loads so we can use them to keep the rolling average.
530    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
531    loads = new HashMap<>();
532
533    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
534      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
535        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
536        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
537        if (rLoads == null) {
538          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
539        } else if (rLoads.size() >= numRegionLoadsToRemember) {
540          rLoads.remove();
541        }
542        rLoads.add(new BalancerRegionLoad(rm));
543        loads.put(regionNameAsString, rLoads);
544      });
545    });
546
547    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
548      cost.setLoads(loads);
549    }
550  }
551
552  protected void initCosts(Cluster cluster) {
553    for (CostFunction c:costFunctions) {
554      c.init(cluster);
555    }
556  }
557
558  protected void updateCostsWithAction(Cluster cluster, Action action) {
559    for (CostFunction c : costFunctions) {
560      c.postAction(action);
561    }
562  }
563
564  /**
565   * Get the names of the cost functions
566   */
567  public String[] getCostFunctionNames() {
568    if (costFunctions == null) return null;
569    String[] ret = new String[costFunctions.length];
570    for (int i = 0; i < costFunctions.length; i++) {
571      CostFunction c = costFunctions[i];
572      ret[i] = c.getClass().getSimpleName();
573    }
574
575    return ret;
576  }
577
578  /**
579   * This is the main cost function.  It will compute a cost associated with a proposed cluster
580   * state.  All different costs will be combined with their multipliers to produce a double cost.
581   *
582   * @param cluster The state of the cluster
583   * @param previousCost the previous cost. This is used as an early out.
584   * @return a double of a cost associated with the proposed cluster state.  This cost is an
585   *         aggregate of all individual cost functions.
586   */
587  protected double computeCost(Cluster cluster, double previousCost) {
588    double total = 0;
589
590    for (int i = 0; i < costFunctions.length; i++) {
591      CostFunction c = costFunctions[i];
592      this.tempFunctionCosts[i] = 0.0;
593
594      if (c.getMultiplier() <= 0) {
595        continue;
596      }
597
598      Float multiplier = c.getMultiplier();
599      Double cost = c.cost();
600
601      this.tempFunctionCosts[i] = multiplier*cost;
602      total += this.tempFunctionCosts[i];
603
604      if (total > previousCost) {
605        break;
606      }
607    }
608
609    return total;
610  }
611
612  /** Generates a candidate action to be applied to the cluster for cost function search */
613  abstract static class CandidateGenerator {
614    abstract Cluster.Action generate(Cluster cluster);
615
616    /**
617     * From a list of regions pick a random one. Null can be returned which
618     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
619     * rather than swap.
620     *
621     * @param cluster        The state of the cluster
622     * @param server         index of the server
623     * @param chanceOfNoSwap Chance that this will decide to try a move rather
624     *                       than a swap.
625     * @return a random {@link RegionInfo} or null if an asymmetrical move is
626     *         suggested.
627     */
628    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
629      // Check to see if this is just a move.
630      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
631        // signal a move only.
632        return -1;
633      }
634      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
635      return cluster.regionsPerServer[server][rand];
636
637    }
638    protected int pickRandomServer(Cluster cluster) {
639      if (cluster.numServers < 1) {
640        return -1;
641      }
642
643      return RANDOM.nextInt(cluster.numServers);
644    }
645
646    protected int pickRandomRack(Cluster cluster) {
647      if (cluster.numRacks < 1) {
648        return -1;
649      }
650
651      return RANDOM.nextInt(cluster.numRacks);
652    }
653
654    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
655      if (cluster.numServers < 2) {
656        return -1;
657      }
658      while (true) {
659        int otherServerIndex = pickRandomServer(cluster);
660        if (otherServerIndex != serverIndex) {
661          return otherServerIndex;
662        }
663      }
664    }
665
666    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
667      if (cluster.numRacks < 2) {
668        return -1;
669      }
670      while (true) {
671        int otherRackIndex = pickRandomRack(cluster);
672        if (otherRackIndex != rackIndex) {
673          return otherRackIndex;
674        }
675      }
676    }
677
678    protected Cluster.Action pickRandomRegions(Cluster cluster,
679                                                       int thisServer,
680                                                       int otherServer) {
681      if (thisServer < 0 || otherServer < 0) {
682        return Cluster.NullAction;
683      }
684
685      // Decide who is most likely to need another region
686      int thisRegionCount = cluster.getNumRegions(thisServer);
687      int otherRegionCount = cluster.getNumRegions(otherServer);
688
689      // Assign the chance based upon the above
690      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
691      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
692
693      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
694      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
695
696      return getAction(thisServer, thisRegion, otherServer, otherRegion);
697    }
698
699    protected Cluster.Action getAction(int fromServer, int fromRegion,
700        int toServer, int toRegion) {
701      if (fromServer < 0 || toServer < 0) {
702        return Cluster.NullAction;
703      }
704      if (fromRegion > 0 && toRegion > 0) {
705        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
706          toServer, toRegion);
707      } else if (fromRegion > 0) {
708        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
709      } else if (toRegion > 0) {
710        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
711      } else {
712        return Cluster.NullAction;
713      }
714    }
715
716    /**
717     * Returns a random iteration order of indexes of an array with size length
718     */
719    protected List<Integer> getRandomIterationOrder(int length) {
720      ArrayList<Integer> order = new ArrayList<>(length);
721      for (int i = 0; i < length; i++) {
722        order.add(i);
723      }
724      Collections.shuffle(order);
725      return order;
726    }
727  }
728
729  static class RandomCandidateGenerator extends CandidateGenerator {
730
731    @Override
732    Cluster.Action generate(Cluster cluster) {
733
734      int thisServer = pickRandomServer(cluster);
735
736      // Pick the other server
737      int otherServer = pickOtherRandomServer(cluster, thisServer);
738
739      return pickRandomRegions(cluster, thisServer, otherServer);
740    }
741  }
742
743  static class LoadCandidateGenerator extends CandidateGenerator {
744
745    @Override
746    Cluster.Action generate(Cluster cluster) {
747      cluster.sortServersByRegionCount();
748      int thisServer = pickMostLoadedServer(cluster, -1);
749      int otherServer = pickLeastLoadedServer(cluster, thisServer);
750
751      return pickRandomRegions(cluster, thisServer, otherServer);
752    }
753
754    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
755      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
756
757      int index = 0;
758      while (servers[index] == null || servers[index] == thisServer) {
759        index++;
760        if (index == servers.length) {
761          return -1;
762        }
763      }
764      return servers[index];
765    }
766
767    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
768      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
769
770      int index = servers.length - 1;
771      while (servers[index] == null || servers[index] == thisServer) {
772        index--;
773        if (index < 0) {
774          return -1;
775        }
776      }
777      return servers[index];
778    }
779  }
780
781  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
782
783    private MasterServices masterServices;
784
785    LocalityBasedCandidateGenerator(MasterServices masterServices) {
786      this.masterServices = masterServices;
787    }
788
789    @Override
790    Cluster.Action generate(Cluster cluster) {
791      if (this.masterServices == null) {
792        int thisServer = pickRandomServer(cluster);
793        // Pick the other server
794        int otherServer = pickOtherRandomServer(cluster, thisServer);
795        return pickRandomRegions(cluster, thisServer, otherServer);
796      }
797
798      // Randomly iterate through regions until you find one that is not on ideal host
799      for (int region : getRandomIterationOrder(cluster.numRegions)) {
800        int currentServer = cluster.regionIndexToServerIndex[region];
801        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
802          Optional<Action> potential = tryMoveOrSwap(
803              cluster,
804              currentServer,
805              region,
806              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
807          );
808          if (potential.isPresent()) {
809            return potential.get();
810          }
811        }
812      }
813      return Cluster.NullAction;
814    }
815
816    /**
817     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
818     * Returns empty optional if no move can be found
819     */
820    private Optional<Action> tryMoveOrSwap(Cluster cluster,
821                                           int fromServer,
822                                           int fromRegion,
823                                           int toServer) {
824      // Try move first. We know apriori fromRegion has the highest locality on toServer
825      if (cluster.serverHasTooFewRegions(toServer)) {
826        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
827      }
828
829      // Compare locality gain/loss from swapping fromRegion with regions on toServer
830      double fromRegionLocalityDelta =
831          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
832      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
833        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
834        double toRegionLocalityDelta =
835            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
836        // If locality would remain neutral or improve, attempt the swap
837        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
838          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
839        }
840      }
841
842      return Optional.absent();
843    }
844
845    private double getWeightedLocality(Cluster cluster, int region, int server) {
846      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
847    }
848
849    void setServices(MasterServices services) {
850      this.masterServices = services;
851    }
852  }
853
854  /**
855   * Generates candidates which moves the replicas out of the region server for
856   * co-hosted region replicas
857   */
858  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
859
860    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
861
862    /**
863     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
864     * (a group is a server, host or rack)
865     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
866     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
867     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
868     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
869     * @return a regionIndex for the selected primary or -1 if there is no co-locating
870     */
871    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
872        , int[] regionIndexToPrimaryIndex) {
873      int currentPrimary = -1;
874      int currentPrimaryIndex = -1;
875      int selectedPrimaryIndex = -1;
876      double currentLargestRandom = -1;
877      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
878      // ids for the regions hosted in server, a consecutive repetition means that replicas
879      // are co-hosted
880      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
881        int primary = j < primariesOfRegionsPerGroup.length
882            ? primariesOfRegionsPerGroup[j] : -1;
883        if (primary != currentPrimary) { // check for whether we see a new primary
884          int numReplicas = j - currentPrimaryIndex;
885          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
886            // decide to select this primary region id or not
887            double currentRandom = RANDOM.nextDouble();
888            // we don't know how many region replicas are co-hosted, we will randomly select one
889            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
890            if (currentRandom > currentLargestRandom) {
891              selectedPrimaryIndex = currentPrimary;
892              currentLargestRandom = currentRandom;
893            }
894          }
895          currentPrimary = primary;
896          currentPrimaryIndex = j;
897        }
898      }
899
900      // we have found the primary id for the region to move. Now find the actual regionIndex
901      // with the given primary, prefer to move the secondary region.
902      for (int j = 0; j < regionsPerGroup.length; j++) {
903        int regionIndex = regionsPerGroup[j];
904        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
905          // always move the secondary, not the primary
906          if (selectedPrimaryIndex != regionIndex) {
907            return regionIndex;
908          }
909        }
910      }
911      return -1;
912    }
913
914    @Override
915    Cluster.Action generate(Cluster cluster) {
916      int serverIndex = pickRandomServer(cluster);
917      if (cluster.numServers <= 1 || serverIndex == -1) {
918        return Cluster.NullAction;
919      }
920
921      int regionIndex = selectCoHostedRegionPerGroup(
922        cluster.primariesOfRegionsPerServer[serverIndex],
923        cluster.regionsPerServer[serverIndex],
924        cluster.regionIndexToPrimaryIndex);
925
926      // if there are no pairs of region replicas co-hosted, default to random generator
927      if (regionIndex == -1) {
928        // default to randompicker
929        return randomGenerator.generate(cluster);
930      }
931
932      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
933      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
934      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
935    }
936  }
937
938  /**
939   * Generates candidates which moves the replicas out of the rack for
940   * co-hosted region replicas in the same rack
941   */
942  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
943    @Override
944    Cluster.Action generate(Cluster cluster) {
945      int rackIndex = pickRandomRack(cluster);
946      if (cluster.numRacks <= 1 || rackIndex == -1) {
947        return super.generate(cluster);
948      }
949
950      int regionIndex = selectCoHostedRegionPerGroup(
951        cluster.primariesOfRegionsPerRack[rackIndex],
952        cluster.regionsPerRack[rackIndex],
953        cluster.regionIndexToPrimaryIndex);
954
955      // if there are no pairs of region replicas co-hosted, default to random generator
956      if (regionIndex == -1) {
957        // default to randompicker
958        return randomGenerator.generate(cluster);
959      }
960
961      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
962      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
963
964      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
965      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
966      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
967      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
968    }
969  }
970
971  /**
972   * Base class of StochasticLoadBalancer's Cost Functions.
973   */
974  abstract static class CostFunction {
975
976    private float multiplier = 0;
977
978    protected Cluster cluster;
979
980    CostFunction(Configuration c) {
981    }
982
983    boolean isNeeded() {
984      return true;
985    }
986    float getMultiplier() {
987      return multiplier;
988    }
989
990    void setMultiplier(float m) {
991      this.multiplier = m;
992    }
993
994    /** Called once per LB invocation to give the cost function
995     * to initialize it's state, and perform any costly calculation.
996     */
997    void init(Cluster cluster) {
998      this.cluster = cluster;
999    }
1000
1001    /** Called once per cluster Action to give the cost function
1002     * an opportunity to update it's state. postAction() is always
1003     * called at least once before cost() is called with the cluster
1004     * that this action is performed on. */
1005    void postAction(Action action) {
1006      switch (action.type) {
1007      case NULL: break;
1008      case ASSIGN_REGION:
1009        AssignRegionAction ar = (AssignRegionAction) action;
1010        regionMoved(ar.region, -1, ar.server);
1011        break;
1012      case MOVE_REGION:
1013        MoveRegionAction mra = (MoveRegionAction) action;
1014        regionMoved(mra.region, mra.fromServer, mra.toServer);
1015        break;
1016      case SWAP_REGIONS:
1017        SwapRegionsAction a = (SwapRegionsAction) action;
1018        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1019        regionMoved(a.toRegion, a.toServer, a.fromServer);
1020        break;
1021      default:
1022        throw new RuntimeException("Uknown action:" + action.type);
1023      }
1024    }
1025
1026    protected void regionMoved(int region, int oldServer, int newServer) {
1027    }
1028
1029    abstract double cost();
1030
1031    /**
1032     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1033     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1034     * possible is all of the elements in one region server and the rest having 0.
1035     *
1036     * @param stats the costs
1037     * @return a scaled set of costs.
1038     */
1039    protected double costFromArray(double[] stats) {
1040      double totalCost = 0;
1041      double total = getSum(stats);
1042
1043      double count = stats.length;
1044      double mean = total/count;
1045
1046      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1047      // a zero sum cost for this to make sense.
1048      double max = ((count - 1) * mean) + (total - mean);
1049
1050      // It's possible that there aren't enough regions to go around
1051      double min;
1052      if (count > total) {
1053        min = ((count - total) * mean) + ((1 - mean) * total);
1054      } else {
1055        // Some will have 1 more than everything else.
1056        int numHigh = (int) (total - (Math.floor(mean) * count));
1057        int numLow = (int) (count - numHigh);
1058
1059        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1060
1061      }
1062      min = Math.max(0, min);
1063      for (int i=0; i<stats.length; i++) {
1064        double n = stats[i];
1065        double diff = Math.abs(mean - n);
1066        totalCost += diff;
1067      }
1068
1069      double scaled =  scale(min, max, totalCost);
1070      return scaled;
1071    }
1072
1073    private double getSum(double[] stats) {
1074      double total = 0;
1075      for(double s:stats) {
1076        total += s;
1077      }
1078      return total;
1079    }
1080
1081    /**
1082     * Scale the value between 0 and 1.
1083     *
1084     * @param min   Min value
1085     * @param max   The Max value
1086     * @param value The value to be scaled.
1087     * @return The scaled value.
1088     */
1089    protected double scale(double min, double max, double value) {
1090      if (max <= min || value <= min) {
1091        return 0;
1092      }
1093      if ((max - min) == 0) return 0;
1094
1095      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1096    }
1097  }
1098
1099  /**
1100   * Given the starting state of the regions and a potential ending state
1101   * compute cost based upon the number of regions that have moved.
1102   */
1103  static class MoveCostFunction extends CostFunction {
1104    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1105    private static final String MAX_MOVES_PERCENT_KEY =
1106        "hbase.master.balancer.stochastic.maxMovePercent";
1107    private static final float DEFAULT_MOVE_COST = 7;
1108    private static final int DEFAULT_MAX_MOVES = 600;
1109    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1110
1111    private final float maxMovesPercent;
1112
1113    MoveCostFunction(Configuration conf) {
1114      super(conf);
1115
1116      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1117      // that large benefits are need to overcome the cost of a move.
1118      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1119      // What percent of the number of regions a single run of the balancer can move.
1120      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1121    }
1122
1123    @Override
1124    double cost() {
1125      // Try and size the max number of Moves, but always be prepared to move some.
1126      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1127          DEFAULT_MAX_MOVES);
1128
1129      double moveCost = cluster.numMovedRegions;
1130
1131      // Don't let this single balance move more than the max moves.
1132      // This allows better scaling to accurately represent the actual cost of a move.
1133      if (moveCost > maxMoves) {
1134        return 1000000;   // return a number much greater than any of the other cost
1135      }
1136
1137      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1138    }
1139  }
1140
1141  /**
1142   * Compute the cost of a potential cluster state from skew in number of
1143   * regions on a cluster.
1144   */
1145  static class RegionCountSkewCostFunction extends CostFunction {
1146    private static final String REGION_COUNT_SKEW_COST_KEY =
1147        "hbase.master.balancer.stochastic.regionCountCost";
1148    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1149
1150    private double[] stats = null;
1151
1152    RegionCountSkewCostFunction(Configuration conf) {
1153      super(conf);
1154      // Load multiplier should be the greatest as it is the most general way to balance data.
1155      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1156    }
1157
1158    @Override
1159    double cost() {
1160      if (stats == null || stats.length != cluster.numServers) {
1161        stats = new double[cluster.numServers];
1162      }
1163
1164      for (int i =0; i < cluster.numServers; i++) {
1165        stats[i] = cluster.regionsPerServer[i].length;
1166      }
1167
1168      return costFromArray(stats);
1169    }
1170  }
1171
1172  /**
1173   * Compute the cost of a potential cluster state from skew in number of
1174   * primary regions on a cluster.
1175   */
1176  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1177    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1178        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1179    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1180
1181    private double[] stats = null;
1182
1183    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1184      super(conf);
1185      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1186      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1187        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1188    }
1189
1190    @Override
1191    double cost() {
1192      if (!cluster.hasRegionReplicas) {
1193        return 0;
1194      }
1195      if (stats == null || stats.length != cluster.numServers) {
1196        stats = new double[cluster.numServers];
1197      }
1198
1199      for (int i = 0; i < cluster.numServers; i++) {
1200        stats[i] = 0;
1201        for (int regionIdx : cluster.regionsPerServer[i]) {
1202          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1203            stats[i]++;
1204          }
1205        }
1206      }
1207
1208      return costFromArray(stats);
1209    }
1210  }
1211
1212  /**
1213   * Compute the cost of a potential cluster configuration based upon how evenly
1214   * distributed tables are.
1215   */
1216  static class TableSkewCostFunction extends CostFunction {
1217
1218    private static final String TABLE_SKEW_COST_KEY =
1219        "hbase.master.balancer.stochastic.tableSkewCost";
1220    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1221
1222    TableSkewCostFunction(Configuration conf) {
1223      super(conf);
1224      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1225    }
1226
1227    @Override
1228    double cost() {
1229      double max = cluster.numRegions;
1230      double min = ((double) cluster.numRegions) / cluster.numServers;
1231      double value = 0;
1232
1233      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1234        value += cluster.numMaxRegionsPerTable[i];
1235      }
1236
1237      return scale(min, max, value);
1238    }
1239  }
1240
1241  /**
1242   * Compute a cost of a potential cluster configuration based upon where
1243   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1244   */
1245  static abstract class LocalityBasedCostFunction extends CostFunction {
1246
1247    private final LocalityType type;
1248
1249    private double bestLocality; // best case locality across cluster weighted by local data size
1250    private double locality; // current locality across cluster weighted by local data size
1251
1252    private MasterServices services;
1253
1254    LocalityBasedCostFunction(Configuration conf,
1255                              MasterServices srv,
1256                              LocalityType type,
1257                              String localityCostKey,
1258                              float defaultLocalityCost) {
1259      super(conf);
1260      this.type = type;
1261      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1262      this.services = srv;
1263      this.locality = 0.0;
1264      this.bestLocality = 0.0;
1265    }
1266
1267    /**
1268     * Maps region to the current entity (server or rack) on which it is stored
1269     */
1270    abstract int regionIndexToEntityIndex(int region);
1271
1272    public void setServices(MasterServices srvc) {
1273      this.services = srvc;
1274    }
1275
1276    @Override
1277    void init(Cluster cluster) {
1278      super.init(cluster);
1279      locality = 0.0;
1280      bestLocality = 0.0;
1281
1282      // If no master, no computation will work, so assume 0 cost
1283      if (this.services == null) {
1284        return;
1285      }
1286
1287      for (int region = 0; region < cluster.numRegions; region++) {
1288        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1289        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1290      }
1291
1292      // We normalize locality to be a score between 0 and 1.0 representing how good it
1293      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1294      // (and the cost is 0)
1295      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1296    }
1297
1298    @Override
1299    protected void regionMoved(int region, int oldServer, int newServer) {
1300      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1301      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1302      if (this.services == null) {
1303        return;
1304      }
1305      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1306      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1307      locality += normalizedDelta;
1308    }
1309
1310    @Override
1311    double cost() {
1312      return 1 - locality;
1313    }
1314
1315    private int getMostLocalEntityForRegion(int region) {
1316      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1317    }
1318
1319    private double getWeightedLocality(int region, int entity) {
1320      return cluster.getOrComputeWeightedLocality(region, entity, type);
1321    }
1322
1323  }
1324
1325  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1326
1327    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1328    private static final float DEFAULT_LOCALITY_COST = 25;
1329
1330    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1331      super(
1332          conf,
1333          srv,
1334          LocalityType.SERVER,
1335          LOCALITY_COST_KEY,
1336          DEFAULT_LOCALITY_COST
1337      );
1338    }
1339
1340    @Override
1341    int regionIndexToEntityIndex(int region) {
1342      return cluster.regionIndexToServerIndex[region];
1343    }
1344  }
1345
1346  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1347
1348    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1349    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1350
1351    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1352      super(
1353          conf,
1354          services,
1355          LocalityType.RACK,
1356          RACK_LOCALITY_COST_KEY,
1357          DEFAULT_RACK_LOCALITY_COST
1358      );
1359    }
1360
1361    @Override
1362    int regionIndexToEntityIndex(int region) {
1363      return cluster.getRackForRegion(region);
1364    }
1365  }
1366
1367  /**
1368   * Base class the allows writing costs functions from rolling average of some
1369   * number from RegionLoad.
1370   */
1371  abstract static class CostFromRegionLoadFunction extends CostFunction {
1372
1373    private ClusterMetrics clusterStatus = null;
1374    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1375    private double[] stats = null;
1376    CostFromRegionLoadFunction(Configuration conf) {
1377      super(conf);
1378    }
1379
1380    void setClusterMetrics(ClusterMetrics status) {
1381      this.clusterStatus = status;
1382    }
1383
1384    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1385      this.loads = l;
1386    }
1387
1388    @Override
1389    double cost() {
1390      if (clusterStatus == null || loads == null) {
1391        return 0;
1392      }
1393
1394      if (stats == null || stats.length != cluster.numServers) {
1395        stats = new double[cluster.numServers];
1396      }
1397
1398      for (int i =0; i < stats.length; i++) {
1399        //Cost this server has from RegionLoad
1400        long cost = 0;
1401
1402        // for every region on this server get the rl
1403        for(int regionIndex:cluster.regionsPerServer[i]) {
1404          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1405
1406          // Now if we found a region load get the type of cost that was requested.
1407          if (regionLoadList != null) {
1408            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1409          }
1410        }
1411
1412        // Add the total cost to the stats.
1413        stats[i] = cost;
1414      }
1415
1416      // Now return the scaled cost from data held in the stats object.
1417      return costFromArray(stats);
1418    }
1419
1420    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1421      double cost = 0;
1422      for (BalancerRegionLoad rl : regionLoadList) {
1423        cost += getCostFromRl(rl);
1424      }
1425      return cost / regionLoadList.size();
1426    }
1427
1428    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1429  }
1430
1431  /**
1432   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1433   * We do not compare about the actual rate in requests per second but rather the rate relative
1434   * to the rest of the regions.
1435   */
1436  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1437
1438    CostFromRegionLoadAsRateFunction(Configuration conf) {
1439      super(conf);
1440    }
1441
1442    @Override
1443    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1444      double cost = 0;
1445      double previous = 0;
1446      boolean isFirst = true;
1447      for (BalancerRegionLoad rl : regionLoadList) {
1448        double current = getCostFromRl(rl);
1449        if (isFirst) {
1450          isFirst = false;
1451        } else {
1452          cost += current - previous;
1453        }
1454        previous = current;
1455      }
1456      return Math.max(0, cost / (regionLoadList.size() - 1));
1457    }
1458  }
1459
1460  /**
1461   * Compute the cost of total number of read requests  The more unbalanced the higher the
1462   * computed cost will be.  This uses a rolling average of regionload.
1463   */
1464
1465  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1466
1467    private static final String READ_REQUEST_COST_KEY =
1468        "hbase.master.balancer.stochastic.readRequestCost";
1469    private static final float DEFAULT_READ_REQUEST_COST = 5;
1470
1471    ReadRequestCostFunction(Configuration conf) {
1472      super(conf);
1473      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1474    }
1475
1476    @Override
1477    protected double getCostFromRl(BalancerRegionLoad rl) {
1478      return rl.getReadRequestsCount();
1479    }
1480  }
1481
1482  /**
1483   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1484   * computed cost will be.  This uses a rolling average of regionload.
1485   */
1486
1487  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1488
1489    private static final String CP_REQUEST_COST_KEY =
1490        "hbase.master.balancer.stochastic.cpRequestCost";
1491    private static final float DEFAULT_CP_REQUEST_COST = 5;
1492
1493    CPRequestCostFunction(Configuration conf) {
1494      super(conf);
1495      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1496    }
1497
1498    @Override
1499    protected double getCostFromRl(BalancerRegionLoad rl) {
1500      return rl.getCpRequestsCount();
1501    }
1502  }
1503
1504  /**
1505   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1506   * computed cost will be.  This uses a rolling average of regionload.
1507   */
1508  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1509
1510    private static final String WRITE_REQUEST_COST_KEY =
1511        "hbase.master.balancer.stochastic.writeRequestCost";
1512    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1513
1514    WriteRequestCostFunction(Configuration conf) {
1515      super(conf);
1516      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1517    }
1518
1519    @Override
1520    protected double getCostFromRl(BalancerRegionLoad rl) {
1521      return rl.getWriteRequestsCount();
1522    }
1523  }
1524
1525  /**
1526   * A cost function for region replicas. We give a very high cost to hosting
1527   * replicas of the same region in the same host. We do not prevent the case
1528   * though, since if numReplicas > numRegionServers, we still want to keep the
1529   * replica open.
1530   */
1531  static class RegionReplicaHostCostFunction extends CostFunction {
1532    private static final String REGION_REPLICA_HOST_COST_KEY =
1533        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1534    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1535
1536    long maxCost = 0;
1537    long[] costsPerGroup; // group is either server, host or rack
1538    int[][] primariesOfRegionsPerGroup;
1539
1540    public RegionReplicaHostCostFunction(Configuration conf) {
1541      super(conf);
1542      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1543        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1544    }
1545
1546    @Override
1547    void init(Cluster cluster) {
1548      super.init(cluster);
1549      // max cost is the case where every region replica is hosted together regardless of host
1550      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1551      costsPerGroup = new long[cluster.numHosts];
1552      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1553          ? cluster.primariesOfRegionsPerHost
1554          : cluster.primariesOfRegionsPerServer;
1555      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1556        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1557      }
1558    }
1559
1560    long getMaxCost(Cluster cluster) {
1561      if (!cluster.hasRegionReplicas) {
1562        return 0; // short circuit
1563      }
1564      // max cost is the case where every region replica is hosted together regardless of host
1565      int[] primariesOfRegions = new int[cluster.numRegions];
1566      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1567          cluster.regions.length);
1568
1569      Arrays.sort(primariesOfRegions);
1570
1571      // compute numReplicas from the sorted array
1572      return costPerGroup(primariesOfRegions);
1573    }
1574
1575    @Override
1576    boolean isNeeded() {
1577      return cluster.hasRegionReplicas;
1578    }
1579
1580    @Override
1581    double cost() {
1582      if (maxCost <= 0) {
1583        return 0;
1584      }
1585
1586      long totalCost = 0;
1587      for (int i = 0 ; i < costsPerGroup.length; i++) {
1588        totalCost += costsPerGroup[i];
1589      }
1590      return scale(0, maxCost, totalCost);
1591    }
1592
1593    /**
1594     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1595     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1596     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1597     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1598     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1599     * @return a sum of numReplicas-1 squared for each primary region in the group.
1600     */
1601    protected long costPerGroup(int[] primariesOfRegions) {
1602      long cost = 0;
1603      int currentPrimary = -1;
1604      int currentPrimaryIndex = -1;
1605      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1606      // sharing the same primary will have consecutive numbers in the array.
1607      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1608        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1609        if (primary != currentPrimary) { // we see a new primary
1610          int numReplicas = j - currentPrimaryIndex;
1611          // square the cost
1612          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1613            cost += (numReplicas - 1) * (numReplicas - 1);
1614          }
1615          currentPrimary = primary;
1616          currentPrimaryIndex = j;
1617        }
1618      }
1619
1620      return cost;
1621    }
1622
1623    @Override
1624    protected void regionMoved(int region, int oldServer, int newServer) {
1625      if (maxCost <= 0) {
1626        return; // no need to compute
1627      }
1628      if (cluster.multiServersPerHost) {
1629        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1630        int newHost = cluster.serverIndexToHostIndex[newServer];
1631        if (newHost != oldHost) {
1632          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1633          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1634        }
1635      } else {
1636        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1637        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1638      }
1639    }
1640  }
1641
1642  /**
1643   * A cost function for region replicas for the rack distribution. We give a relatively high
1644   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1645   * though.
1646   */
1647  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1648    private static final String REGION_REPLICA_RACK_COST_KEY =
1649        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1650    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1651
1652    public RegionReplicaRackCostFunction(Configuration conf) {
1653      super(conf);
1654      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1655        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1656    }
1657
1658    @Override
1659    void init(Cluster cluster) {
1660      this.cluster = cluster;
1661      if (cluster.numRacks <= 1) {
1662        maxCost = 0;
1663        return; // disabled for 1 rack
1664      }
1665      // max cost is the case where every region replica is hosted together regardless of rack
1666      maxCost = getMaxCost(cluster);
1667      costsPerGroup = new long[cluster.numRacks];
1668      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1669        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1670      }
1671    }
1672
1673    @Override
1674    protected void regionMoved(int region, int oldServer, int newServer) {
1675      if (maxCost <= 0) {
1676        return; // no need to compute
1677      }
1678      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1679      int newRack = cluster.serverIndexToRackIndex[newServer];
1680      if (newRack != oldRack) {
1681        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1682        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1683      }
1684    }
1685  }
1686
1687  /**
1688   * Compute the cost of total memstore size.  The more unbalanced the higher the
1689   * computed cost will be.  This uses a rolling average of regionload.
1690   */
1691  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1692
1693    private static final String MEMSTORE_SIZE_COST_KEY =
1694        "hbase.master.balancer.stochastic.memstoreSizeCost";
1695    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1696
1697    MemStoreSizeCostFunction(Configuration conf) {
1698      super(conf);
1699      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1700    }
1701
1702    @Override
1703    protected double getCostFromRl(BalancerRegionLoad rl) {
1704      return rl.getMemStoreSizeMB();
1705    }
1706  }
1707
1708  /**
1709   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1710   * computed cost will be.  This uses a rolling average of regionload.
1711   */
1712  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1713
1714    private static final String STOREFILE_SIZE_COST_KEY =
1715        "hbase.master.balancer.stochastic.storefileSizeCost";
1716    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1717
1718    StoreFileCostFunction(Configuration conf) {
1719      super(conf);
1720      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1721    }
1722
1723    @Override
1724    protected double getCostFromRl(BalancerRegionLoad rl) {
1725      return rl.getStorefileSizeMB();
1726    }
1727  }
1728
1729  /**
1730   * A helper function to compose the attribute name from tablename and costfunction name
1731   */
1732  public static String composeAttributeName(String tableName, String costFunctionName) {
1733    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1734  }
1735}