001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Optional;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057
058/**
059 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
060 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
061 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
062 * <ul>
063 * <li>Region Load</li>
064 * <li>Table Load</li>
065 * <li>Data Locality</li>
066 * <li>Memstore Sizes</li>
067 * <li>Storefile Sizes</li>
068 * </ul>
069 *
070 *
071 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
072 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
073 * scaled by their respective multipliers:</p>
074 *
075 * <ul>
076 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
077 *   <li>hbase.master.balancer.stochastic.moveCost</li>
078 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
079 *   <li>hbase.master.balancer.stochastic.localityCost</li>
080 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
081 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
082 * </ul>
083 *
084 * <p>In addition to the above configurations, the balancer can be tuned by the following
085 * configuration values:</p>
086 * <ul>
087 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
088 *   controls what the max number of regions that can be moved in a single invocation of this
089 *   balancer.</li>
090 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
091 *   regions is multiplied to try and get the number of times the balancer will
092 *   mutate all servers.</li>
093 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
094 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
095 *   value and the above computation.</li>
096 * </ul>
097 *
098 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
099 * so that the balancer gets the full picture of all loads on the cluster.</p>
100 */
101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
102@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
103  justification="Complaint is about costFunctions not being synchronized; not end of the world")
104public class StochasticLoadBalancer extends BaseLoadBalancer {
105
106  protected static final String STEPS_PER_REGION_KEY =
107      "hbase.master.balancer.stochastic.stepsPerRegion";
108  protected static final String MAX_STEPS_KEY =
109      "hbase.master.balancer.stochastic.maxSteps";
110  protected static final String RUN_MAX_STEPS_KEY =
111      "hbase.master.balancer.stochastic.runMaxSteps";
112  protected static final String MAX_RUNNING_TIME_KEY =
113      "hbase.master.balancer.stochastic.maxRunningTime";
114  protected static final String KEEP_REGION_LOADS =
115      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
116  private static final String TABLE_FUNCTION_SEP = "_";
117  protected static final String MIN_COST_NEED_BALANCE_KEY =
118      "hbase.master.balancer.stochastic.minCostNeedBalance";
119
120  protected static final Random RANDOM = new Random(System.currentTimeMillis());
121  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
122
123  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
124
125  // values are defaults
126  private int maxSteps = 1000000;
127  private boolean runMaxSteps = false;
128  private int stepsPerRegion = 800;
129  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
130  private int numRegionLoadsToRemember = 15;
131  private float minCostNeedBalance = 0.05f;
132
133  private List<CandidateGenerator> candidateGenerators;
134  private CostFromRegionLoadFunction[] regionLoadFunctions;
135  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
136
137  // to save and report costs to JMX
138  private Double curOverallCost = 0d;
139  private Double[] tempFunctionCosts;
140  private Double[] curFunctionCosts;
141
142  // Keep locality based picker and cost function to alert them
143  // when new services are offered
144  private LocalityBasedCandidateGenerator localityCandidateGenerator;
145  private ServerLocalityCostFunction localityCost;
146  private RackLocalityCostFunction rackLocalityCost;
147  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
148  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
149  private boolean isByTable = false;
150  private TableName tableName = null;
151
152  /**
153   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
154   * default MetricsBalancer
155   */
156  public StochasticLoadBalancer() {
157    super(new MetricsStochasticBalancer());
158  }
159
160  @Override
161  public void onConfigurationChange(Configuration conf) {
162    setConf(conf);
163  }
164
165  @Override
166  public synchronized void setConf(Configuration conf) {
167    super.setConf(conf);
168    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
169    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
170    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
171    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
172
173    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
174    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
175    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
176    if (localityCandidateGenerator == null) {
177      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
178    }
179    localityCost = new ServerLocalityCostFunction(conf, services);
180    rackLocalityCost = new RackLocalityCostFunction(conf, services);
181
182    if (this.candidateGenerators == null) {
183      candidateGenerators = Lists.newArrayList();
184      candidateGenerators.add(new RandomCandidateGenerator());
185      candidateGenerators.add(new LoadCandidateGenerator());
186      candidateGenerators.add(localityCandidateGenerator);
187      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
188    }
189    regionLoadFunctions = new CostFromRegionLoadFunction[] {
190      new ReadRequestCostFunction(conf),
191      new WriteRequestCostFunction(conf),
192      new MemStoreSizeCostFunction(conf),
193      new StoreFileCostFunction(conf)
194    };
195    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
196    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
197    costFunctions = new CostFunction[]{
198      new RegionCountSkewCostFunction(conf),
199      new PrimaryRegionCountSkewCostFunction(conf),
200      new MoveCostFunction(conf),
201      localityCost,
202      rackLocalityCost,
203      new TableSkewCostFunction(conf),
204      regionReplicaHostCostFunction,
205      regionReplicaRackCostFunction,
206      regionLoadFunctions[0],
207      regionLoadFunctions[1],
208      regionLoadFunctions[2],
209      regionLoadFunctions[3],
210    };
211    curFunctionCosts= new Double[costFunctions.length];
212    tempFunctionCosts= new Double[costFunctions.length];
213    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
214        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
215  }
216
217  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
218    this.candidateGenerators = customCandidateGenerators;
219  }
220
221  @Override
222  protected void setSlop(Configuration conf) {
223    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
224  }
225
226  @Override
227  public synchronized void setClusterMetrics(ClusterMetrics st) {
228    super.setClusterMetrics(st);
229    updateRegionLoad();
230    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
231      cost.setClusterMetrics(st);
232    }
233
234    // update metrics size
235    try {
236      // by-table or ensemble mode
237      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
238      int functionsCount = getCostFunctionNames().length;
239
240      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
241    } catch (Exception e) {
242      LOG.error("failed to get the size of all tables", e);
243    }
244  }
245
246  /**
247   * Update the number of metrics that are reported to JMX
248   */
249  public void updateMetricsSize(int size) {
250    if (metricsBalancer instanceof MetricsStochasticBalancer) {
251        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
252    }
253  }
254
255  @Override
256  public synchronized void setMasterServices(MasterServices masterServices) {
257    super.setMasterServices(masterServices);
258    this.localityCost.setServices(masterServices);
259    this.rackLocalityCost.setServices(masterServices);
260    this.localityCandidateGenerator.setServices(masterServices);
261  }
262
263  @Override
264  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
265    regionReplicaHostCostFunction.init(c);
266    if (regionReplicaHostCostFunction.cost() > 0) return true;
267    regionReplicaRackCostFunction.init(c);
268    if (regionReplicaRackCostFunction.cost() > 0) return true;
269    return false;
270  }
271
272  @Override
273  protected boolean needsBalance(Cluster cluster) {
274    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
275    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
276      if (LOG.isDebugEnabled()) {
277        LOG.debug("Not running balancer because only " + cs.getNumServers()
278            + " active regionserver(s)");
279      }
280      return false;
281    }
282    if (areSomeRegionReplicasColocated(cluster)) {
283      return true;
284    }
285
286    double total = 0.0;
287    float sumMultiplier = 0.0f;
288    for (CostFunction c : costFunctions) {
289      float multiplier = c.getMultiplier();
290      if (multiplier <= 0) {
291        continue;
292      }
293      if (!c.isNeeded()) {
294        LOG.debug("{} not needed", c.getClass().getSimpleName());
295        continue;
296      }
297      sumMultiplier += multiplier;
298      total += c.cost() * multiplier;
299    }
300
301    if (total <= 0 || sumMultiplier <= 0
302        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
303      if (LOG.isTraceEnabled()) {
304        final String loadBalanceTarget =
305            isByTable ? String.format("table (%s)", tableName) : "cluster";
306        LOG.trace("Skipping load balancing because the {} is balanced. Total cost: {}, "
307            + "Sum multiplier: {}, Minimum cost needed for balance: {}", loadBalanceTarget, total,
308            sumMultiplier, minCostNeedBalance);
309      }
310      return false;
311    }
312    return true;
313  }
314
315  @Override
316  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
317    List<RegionInfo>> clusterState) {
318    this.tableName = tableName;
319    return balanceCluster(clusterState);
320  }
321
322  @VisibleForTesting
323  Cluster.Action nextAction(Cluster cluster) {
324    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
325            .generate(cluster);
326  }
327
328  /**
329   * Given the cluster state this will try and approach an optimal balance. This
330   * should always approach the optimal state given enough steps.
331   */
332  @Override
333  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
334    List<RegionInfo>> clusterState) {
335    List<RegionPlan> plans = balanceMasterRegions(clusterState);
336    if (plans != null || clusterState == null || clusterState.size() <= 1) {
337      return plans;
338    }
339
340    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
341      if (clusterState.size() <= 2) {
342        return null;
343      }
344      clusterState = new HashMap<>(clusterState);
345      clusterState.remove(masterServerName);
346    }
347
348    // On clusters with lots of HFileLinks or lots of reference files,
349    // instantiating the storefile infos can be quite expensive.
350    // Allow turning this feature off if the locality cost is not going to
351    // be used in any computations.
352    RegionLocationFinder finder = null;
353    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
354        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
355      finder = this.regionFinder;
356    }
357
358    //The clusterState that is given to this method contains the state
359    //of all the regions in the table(s) (that's true today)
360    // Keep track of servers to iterate through them.
361    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
362
363    long startTime = EnvironmentEdgeManager.currentTime();
364
365    initCosts(cluster);
366
367    if (!needsBalance(cluster)) {
368      return null;
369    }
370
371    double currentCost = computeCost(cluster, Double.MAX_VALUE);
372    curOverallCost = currentCost;
373    for (int i = 0; i < this.curFunctionCosts.length; i++) {
374      curFunctionCosts[i] = tempFunctionCosts[i];
375    }
376    double initCost = currentCost;
377    double newCost = currentCost;
378
379    long computedMaxSteps;
380    if (runMaxSteps) {
381      computedMaxSteps = Math.max(this.maxSteps,
382          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
383    } else {
384      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
385          (long)cluster.numServers;
386      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
387      if (calculatedMaxSteps > maxSteps) {
388        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
389            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
390            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
391            + "(This config change does not require service restart)", calculatedMaxSteps,
392            maxRunningTime);
393      }
394    }
395    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
396        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
397
398    // Perform a stochastic walk to see if we can get a good fit.
399    long step;
400
401    for (step = 0; step < computedMaxSteps; step++) {
402      Cluster.Action action = nextAction(cluster);
403
404      if (action.type == Type.NULL) {
405        continue;
406      }
407
408      cluster.doAction(action);
409      updateCostsWithAction(cluster, action);
410
411      newCost = computeCost(cluster, currentCost);
412
413      // Should this be kept?
414      if (newCost < currentCost) {
415        currentCost = newCost;
416
417        // save for JMX
418        curOverallCost = currentCost;
419        for (int i = 0; i < this.curFunctionCosts.length; i++) {
420          curFunctionCosts[i] = tempFunctionCosts[i];
421        }
422      } else {
423        // Put things back the way they were before.
424        // TODO: undo by remembering old values
425        Action undoAction = action.undoAction();
426        cluster.doAction(undoAction);
427        updateCostsWithAction(cluster, undoAction);
428      }
429
430      if (EnvironmentEdgeManager.currentTime() - startTime >
431          maxRunningTime) {
432        break;
433      }
434    }
435    long endTime = EnvironmentEdgeManager.currentTime();
436
437    metricsBalancer.balanceCluster(endTime - startTime);
438
439    // update costs metrics
440    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
441    if (initCost > currentCost) {
442      plans = createRegionPlans(cluster);
443      LOG.info("Finished computing new load balance plan. Computation took {}" +
444        " to try {} different iterations.  Found a solution that moves " +
445        "{} regions; Going from a computed cost of {}" +
446        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
447        step, plans.size(), initCost, currentCost);
448      return plans;
449    }
450    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
451      "{}, and did not find anything with a computed cost less than {}", step,
452      java.time.Duration.ofMillis(endTime - startTime), initCost);
453    return null;
454  }
455
456  /**
457   * update costs to JMX
458   */
459  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
460    if (tableName == null) return;
461
462    // check if the metricsBalancer is MetricsStochasticBalancer before casting
463    if (metricsBalancer instanceof MetricsStochasticBalancer) {
464      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
465      // overall cost
466      balancer.updateStochasticCost(tableName.getNameAsString(),
467        "Overall", "Overall cost", overall);
468
469      // each cost function
470      for (int i = 0; i < costFunctions.length; i++) {
471        CostFunction costFunction = costFunctions[i];
472        String costFunctionName = costFunction.getClass().getSimpleName();
473        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
474        // TODO: cost function may need a specific description
475        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
476          "The percent of " + costFunctionName, costPercent);
477      }
478    }
479  }
480
481  private String functionCost() {
482    StringBuilder builder = new StringBuilder();
483    for (CostFunction c:costFunctions) {
484      builder.append(c.getClass().getSimpleName());
485      builder.append(" : (");
486      builder.append(c.getMultiplier());
487      builder.append(", ");
488      builder.append(c.cost());
489      builder.append("); ");
490    }
491    return builder.toString();
492  }
493
494  /**
495   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
496   * state.
497   *
498   * @param cluster The state of the cluster
499   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
500   */
501  private List<RegionPlan> createRegionPlans(Cluster cluster) {
502    List<RegionPlan> plans = new LinkedList<>();
503    for (int regionIndex = 0;
504         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
505      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
506      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
507
508      if (initialServerIndex != newServerIndex) {
509        RegionInfo region = cluster.regions[regionIndex];
510        ServerName initialServer = cluster.servers[initialServerIndex];
511        ServerName newServer = cluster.servers[newServerIndex];
512
513        if (LOG.isTraceEnabled()) {
514          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
515              + initialServer.getHostname() + " to " + newServer.getHostname());
516        }
517        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
518        plans.add(rp);
519      }
520    }
521    return plans;
522  }
523
524  /**
525   * Store the current region loads.
526   */
527  private synchronized void updateRegionLoad() {
528    // We create a new hashmap so that regions that are no longer there are removed.
529    // However we temporarily need the old loads so we can use them to keep the rolling average.
530    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
531    loads = new HashMap<>();
532
533    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
534      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
535        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
536        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
537        if (rLoads == null) {
538          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
539        } else if (rLoads.size() >= numRegionLoadsToRemember) {
540          rLoads.remove();
541        }
542        rLoads.add(new BalancerRegionLoad(rm));
543        loads.put(regionNameAsString, rLoads);
544      });
545    });
546
547    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
548      cost.setLoads(loads);
549    }
550  }
551
552  protected void initCosts(Cluster cluster) {
553    for (CostFunction c:costFunctions) {
554      c.init(cluster);
555    }
556  }
557
558  protected void updateCostsWithAction(Cluster cluster, Action action) {
559    for (CostFunction c : costFunctions) {
560      c.postAction(action);
561    }
562  }
563
564  /**
565   * Get the names of the cost functions
566   */
567  public String[] getCostFunctionNames() {
568    if (costFunctions == null) return null;
569    String[] ret = new String[costFunctions.length];
570    for (int i = 0; i < costFunctions.length; i++) {
571      CostFunction c = costFunctions[i];
572      ret[i] = c.getClass().getSimpleName();
573    }
574
575    return ret;
576  }
577
578  /**
579   * This is the main cost function.  It will compute a cost associated with a proposed cluster
580   * state.  All different costs will be combined with their multipliers to produce a double cost.
581   *
582   * @param cluster The state of the cluster
583   * @param previousCost the previous cost. This is used as an early out.
584   * @return a double of a cost associated with the proposed cluster state.  This cost is an
585   *         aggregate of all individual cost functions.
586   */
587  protected double computeCost(Cluster cluster, double previousCost) {
588    double total = 0;
589
590    for (int i = 0; i < costFunctions.length; i++) {
591      CostFunction c = costFunctions[i];
592      this.tempFunctionCosts[i] = 0.0;
593
594      if (c.getMultiplier() <= 0) {
595        continue;
596      }
597
598      Float multiplier = c.getMultiplier();
599      Double cost = c.cost();
600
601      this.tempFunctionCosts[i] = multiplier*cost;
602      total += this.tempFunctionCosts[i];
603
604      if (total > previousCost) {
605        break;
606      }
607    }
608
609    return total;
610  }
611
612  /** Generates a candidate action to be applied to the cluster for cost function search */
613  abstract static class CandidateGenerator {
614    abstract Cluster.Action generate(Cluster cluster);
615
616    /**
617     * From a list of regions pick a random one. Null can be returned which
618     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
619     * rather than swap.
620     *
621     * @param cluster        The state of the cluster
622     * @param server         index of the server
623     * @param chanceOfNoSwap Chance that this will decide to try a move rather
624     *                       than a swap.
625     * @return a random {@link RegionInfo} or null if an asymmetrical move is
626     *         suggested.
627     */
628    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
629      // Check to see if this is just a move.
630      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
631        // signal a move only.
632        return -1;
633      }
634      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
635      return cluster.regionsPerServer[server][rand];
636
637    }
638    protected int pickRandomServer(Cluster cluster) {
639      if (cluster.numServers < 1) {
640        return -1;
641      }
642
643      return RANDOM.nextInt(cluster.numServers);
644    }
645
646    protected int pickRandomRack(Cluster cluster) {
647      if (cluster.numRacks < 1) {
648        return -1;
649      }
650
651      return RANDOM.nextInt(cluster.numRacks);
652    }
653
654    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
655      if (cluster.numServers < 2) {
656        return -1;
657      }
658      while (true) {
659        int otherServerIndex = pickRandomServer(cluster);
660        if (otherServerIndex != serverIndex) {
661          return otherServerIndex;
662        }
663      }
664    }
665
666    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
667      if (cluster.numRacks < 2) {
668        return -1;
669      }
670      while (true) {
671        int otherRackIndex = pickRandomRack(cluster);
672        if (otherRackIndex != rackIndex) {
673          return otherRackIndex;
674        }
675      }
676    }
677
678    protected Cluster.Action pickRandomRegions(Cluster cluster,
679                                                       int thisServer,
680                                                       int otherServer) {
681      if (thisServer < 0 || otherServer < 0) {
682        return Cluster.NullAction;
683      }
684
685      // Decide who is most likely to need another region
686      int thisRegionCount = cluster.getNumRegions(thisServer);
687      int otherRegionCount = cluster.getNumRegions(otherServer);
688
689      // Assign the chance based upon the above
690      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
691      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
692
693      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
694      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
695
696      return getAction(thisServer, thisRegion, otherServer, otherRegion);
697    }
698
699    protected Cluster.Action getAction(int fromServer, int fromRegion,
700        int toServer, int toRegion) {
701      if (fromServer < 0 || toServer < 0) {
702        return Cluster.NullAction;
703      }
704      if (fromRegion > 0 && toRegion > 0) {
705        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
706          toServer, toRegion);
707      } else if (fromRegion > 0) {
708        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
709      } else if (toRegion > 0) {
710        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
711      } else {
712        return Cluster.NullAction;
713      }
714    }
715
716    /**
717     * Returns a random iteration order of indexes of an array with size length
718     */
719    protected List<Integer> getRandomIterationOrder(int length) {
720      ArrayList<Integer> order = new ArrayList<>(length);
721      for (int i = 0; i < length; i++) {
722        order.add(i);
723      }
724      Collections.shuffle(order);
725      return order;
726    }
727  }
728
729  static class RandomCandidateGenerator extends CandidateGenerator {
730
731    @Override
732    Cluster.Action generate(Cluster cluster) {
733
734      int thisServer = pickRandomServer(cluster);
735
736      // Pick the other server
737      int otherServer = pickOtherRandomServer(cluster, thisServer);
738
739      return pickRandomRegions(cluster, thisServer, otherServer);
740    }
741  }
742
743  static class LoadCandidateGenerator extends CandidateGenerator {
744
745    @Override
746    Cluster.Action generate(Cluster cluster) {
747      cluster.sortServersByRegionCount();
748      int thisServer = pickMostLoadedServer(cluster, -1);
749      int otherServer = pickLeastLoadedServer(cluster, thisServer);
750
751      return pickRandomRegions(cluster, thisServer, otherServer);
752    }
753
754    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
755      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
756
757      int index = 0;
758      while (servers[index] == null || servers[index] == thisServer) {
759        index++;
760        if (index == servers.length) {
761          return -1;
762        }
763      }
764      return servers[index];
765    }
766
767    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
768      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
769
770      int index = servers.length - 1;
771      while (servers[index] == null || servers[index] == thisServer) {
772        index--;
773        if (index < 0) {
774          return -1;
775        }
776      }
777      return servers[index];
778    }
779  }
780
781  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
782
783    private MasterServices masterServices;
784
785    LocalityBasedCandidateGenerator(MasterServices masterServices) {
786      this.masterServices = masterServices;
787    }
788
789    @Override
790    Cluster.Action generate(Cluster cluster) {
791      if (this.masterServices == null) {
792        int thisServer = pickRandomServer(cluster);
793        // Pick the other server
794        int otherServer = pickOtherRandomServer(cluster, thisServer);
795        return pickRandomRegions(cluster, thisServer, otherServer);
796      }
797
798      // Randomly iterate through regions until you find one that is not on ideal host
799      for (int region : getRandomIterationOrder(cluster.numRegions)) {
800        int currentServer = cluster.regionIndexToServerIndex[region];
801        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
802          Optional<Action> potential = tryMoveOrSwap(
803              cluster,
804              currentServer,
805              region,
806              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
807          );
808          if (potential.isPresent()) {
809            return potential.get();
810          }
811        }
812      }
813      return Cluster.NullAction;
814    }
815
816    /**
817     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
818     * Returns empty optional if no move can be found
819     */
820    private Optional<Action> tryMoveOrSwap(Cluster cluster,
821                                           int fromServer,
822                                           int fromRegion,
823                                           int toServer) {
824      // Try move first. We know apriori fromRegion has the highest locality on toServer
825      if (cluster.serverHasTooFewRegions(toServer)) {
826        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
827      }
828
829      // Compare locality gain/loss from swapping fromRegion with regions on toServer
830      double fromRegionLocalityDelta =
831          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
832      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
833        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
834        double toRegionLocalityDelta =
835            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
836        // If locality would remain neutral or improve, attempt the swap
837        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
838          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
839        }
840      }
841
842      return Optional.absent();
843    }
844
845    private double getWeightedLocality(Cluster cluster, int region, int server) {
846      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
847    }
848
849    void setServices(MasterServices services) {
850      this.masterServices = services;
851    }
852  }
853
854  /**
855   * Generates candidates which moves the replicas out of the region server for
856   * co-hosted region replicas
857   */
858  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
859
860    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
861
862    /**
863     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
864     * (a group is a server, host or rack)
865     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
866     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
867     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
868     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
869     * @return a regionIndex for the selected primary or -1 if there is no co-locating
870     */
871    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
872        , int[] regionIndexToPrimaryIndex) {
873      int currentPrimary = -1;
874      int currentPrimaryIndex = -1;
875      int selectedPrimaryIndex = -1;
876      double currentLargestRandom = -1;
877      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
878      // ids for the regions hosted in server, a consecutive repetition means that replicas
879      // are co-hosted
880      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
881        int primary = j < primariesOfRegionsPerGroup.length
882            ? primariesOfRegionsPerGroup[j] : -1;
883        if (primary != currentPrimary) { // check for whether we see a new primary
884          int numReplicas = j - currentPrimaryIndex;
885          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
886            // decide to select this primary region id or not
887            double currentRandom = RANDOM.nextDouble();
888            // we don't know how many region replicas are co-hosted, we will randomly select one
889            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
890            if (currentRandom > currentLargestRandom) {
891              selectedPrimaryIndex = currentPrimary;
892              currentLargestRandom = currentRandom;
893            }
894          }
895          currentPrimary = primary;
896          currentPrimaryIndex = j;
897        }
898      }
899
900      // we have found the primary id for the region to move. Now find the actual regionIndex
901      // with the given primary, prefer to move the secondary region.
902      for (int j = 0; j < regionsPerGroup.length; j++) {
903        int regionIndex = regionsPerGroup[j];
904        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
905          // always move the secondary, not the primary
906          if (selectedPrimaryIndex != regionIndex) {
907            return regionIndex;
908          }
909        }
910      }
911      return -1;
912    }
913
914    @Override
915    Cluster.Action generate(Cluster cluster) {
916      int serverIndex = pickRandomServer(cluster);
917      if (cluster.numServers <= 1 || serverIndex == -1) {
918        return Cluster.NullAction;
919      }
920
921      int regionIndex = selectCoHostedRegionPerGroup(
922        cluster.primariesOfRegionsPerServer[serverIndex],
923        cluster.regionsPerServer[serverIndex],
924        cluster.regionIndexToPrimaryIndex);
925
926      // if there are no pairs of region replicas co-hosted, default to random generator
927      if (regionIndex == -1) {
928        // default to randompicker
929        return randomGenerator.generate(cluster);
930      }
931
932      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
933      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
934      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
935    }
936  }
937
938  /**
939   * Generates candidates which moves the replicas out of the rack for
940   * co-hosted region replicas in the same rack
941   */
942  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
943    @Override
944    Cluster.Action generate(Cluster cluster) {
945      int rackIndex = pickRandomRack(cluster);
946      if (cluster.numRacks <= 1 || rackIndex == -1) {
947        return super.generate(cluster);
948      }
949
950      int regionIndex = selectCoHostedRegionPerGroup(
951        cluster.primariesOfRegionsPerRack[rackIndex],
952        cluster.regionsPerRack[rackIndex],
953        cluster.regionIndexToPrimaryIndex);
954
955      // if there are no pairs of region replicas co-hosted, default to random generator
956      if (regionIndex == -1) {
957        // default to randompicker
958        return randomGenerator.generate(cluster);
959      }
960
961      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
962      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
963
964      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
965      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
966      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
967      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
968    }
969  }
970
971  /**
972   * Base class of StochasticLoadBalancer's Cost Functions.
973   */
974  abstract static class CostFunction {
975
976    private float multiplier = 0;
977
978    protected Cluster cluster;
979
980    CostFunction(Configuration c) {
981    }
982
983    boolean isNeeded() {
984      return true;
985    }
986    float getMultiplier() {
987      return multiplier;
988    }
989
990    void setMultiplier(float m) {
991      this.multiplier = m;
992    }
993
994    /** Called once per LB invocation to give the cost function
995     * to initialize it's state, and perform any costly calculation.
996     */
997    void init(Cluster cluster) {
998      this.cluster = cluster;
999    }
1000
1001    /** Called once per cluster Action to give the cost function
1002     * an opportunity to update it's state. postAction() is always
1003     * called at least once before cost() is called with the cluster
1004     * that this action is performed on. */
1005    void postAction(Action action) {
1006      switch (action.type) {
1007      case NULL: break;
1008      case ASSIGN_REGION:
1009        AssignRegionAction ar = (AssignRegionAction) action;
1010        regionMoved(ar.region, -1, ar.server);
1011        break;
1012      case MOVE_REGION:
1013        MoveRegionAction mra = (MoveRegionAction) action;
1014        regionMoved(mra.region, mra.fromServer, mra.toServer);
1015        break;
1016      case SWAP_REGIONS:
1017        SwapRegionsAction a = (SwapRegionsAction) action;
1018        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1019        regionMoved(a.toRegion, a.toServer, a.fromServer);
1020        break;
1021      default:
1022        throw new RuntimeException("Uknown action:" + action.type);
1023      }
1024    }
1025
1026    protected void regionMoved(int region, int oldServer, int newServer) {
1027    }
1028
1029    abstract double cost();
1030
1031    /**
1032     * Function to compute a scaled cost using {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics}.
1033     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1034     * possible is all of the elements in one region server and the rest having 0.
1035     *
1036     * @param stats the costs
1037     * @return a scaled set of costs.
1038     */
1039    protected double costFromArray(double[] stats) {
1040      double totalCost = 0;
1041      double total = getSum(stats);
1042
1043      double count = stats.length;
1044      double mean = total/count;
1045
1046      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1047      // a zero sum cost for this to make sense.
1048      double max = ((count - 1) * mean) + (total - mean);
1049
1050      // It's possible that there aren't enough regions to go around
1051      double min;
1052      if (count > total) {
1053        min = ((count - total) * mean) + ((1 - mean) * total);
1054      } else {
1055        // Some will have 1 more than everything else.
1056        int numHigh = (int) (total - (Math.floor(mean) * count));
1057        int numLow = (int) (count - numHigh);
1058
1059        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1060
1061      }
1062      min = Math.max(0, min);
1063      for (int i=0; i<stats.length; i++) {
1064        double n = stats[i];
1065        double diff = Math.abs(mean - n);
1066        totalCost += diff;
1067      }
1068
1069      double scaled =  scale(min, max, totalCost);
1070      return scaled;
1071    }
1072
1073    private double getSum(double[] stats) {
1074      double total = 0;
1075      for(double s:stats) {
1076        total += s;
1077      }
1078      return total;
1079    }
1080
1081    /**
1082     * Scale the value between 0 and 1.
1083     *
1084     * @param min   Min value
1085     * @param max   The Max value
1086     * @param value The value to be scaled.
1087     * @return The scaled value.
1088     */
1089    protected double scale(double min, double max, double value) {
1090      if (max <= min || value <= min) {
1091        return 0;
1092      }
1093      if ((max - min) == 0) return 0;
1094
1095      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1096    }
1097  }
1098
1099  /**
1100   * Given the starting state of the regions and a potential ending state
1101   * compute cost based upon the number of regions that have moved.
1102   */
1103  static class MoveCostFunction extends CostFunction {
1104    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1105    private static final String MAX_MOVES_PERCENT_KEY =
1106        "hbase.master.balancer.stochastic.maxMovePercent";
1107    private static final float DEFAULT_MOVE_COST = 7;
1108    private static final int DEFAULT_MAX_MOVES = 600;
1109    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1110
1111    private final float maxMovesPercent;
1112
1113    MoveCostFunction(Configuration conf) {
1114      super(conf);
1115
1116      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1117      // that large benefits are need to overcome the cost of a move.
1118      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1119      // What percent of the number of regions a single run of the balancer can move.
1120      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1121    }
1122
1123    @Override
1124    double cost() {
1125      // Try and size the max number of Moves, but always be prepared to move some.
1126      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1127          DEFAULT_MAX_MOVES);
1128
1129      double moveCost = cluster.numMovedRegions;
1130
1131      // Don't let this single balance move more than the max moves.
1132      // This allows better scaling to accurately represent the actual cost of a move.
1133      if (moveCost > maxMoves) {
1134        return 1000000;   // return a number much greater than any of the other cost
1135      }
1136
1137      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1138    }
1139  }
1140
1141  /**
1142   * Compute the cost of a potential cluster state from skew in number of
1143   * regions on a cluster.
1144   */
1145  static class RegionCountSkewCostFunction extends CostFunction {
1146    private static final String REGION_COUNT_SKEW_COST_KEY =
1147        "hbase.master.balancer.stochastic.regionCountCost";
1148    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1149
1150    private double[] stats = null;
1151
1152    RegionCountSkewCostFunction(Configuration conf) {
1153      super(conf);
1154      // Load multiplier should be the greatest as it is the most general way to balance data.
1155      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1156    }
1157
1158    @Override
1159    double cost() {
1160      if (stats == null || stats.length != cluster.numServers) {
1161        stats = new double[cluster.numServers];
1162      }
1163
1164      for (int i =0; i < cluster.numServers; i++) {
1165        stats[i] = cluster.regionsPerServer[i].length;
1166      }
1167
1168      return costFromArray(stats);
1169    }
1170  }
1171
1172  /**
1173   * Compute the cost of a potential cluster state from skew in number of
1174   * primary regions on a cluster.
1175   */
1176  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1177    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1178        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1179    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1180
1181    private double[] stats = null;
1182
1183    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1184      super(conf);
1185      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1186      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1187        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1188    }
1189
1190    @Override
1191    boolean isNeeded() {
1192      return cluster.hasRegionReplicas;
1193    }
1194
1195    @Override
1196    double cost() {
1197      if (!cluster.hasRegionReplicas) {
1198        return 0;
1199      }
1200      if (stats == null || stats.length != cluster.numServers) {
1201        stats = new double[cluster.numServers];
1202      }
1203
1204      for (int i = 0; i < cluster.numServers; i++) {
1205        stats[i] = 0;
1206        for (int regionIdx : cluster.regionsPerServer[i]) {
1207          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1208            stats[i]++;
1209          }
1210        }
1211      }
1212
1213      return costFromArray(stats);
1214    }
1215  }
1216
1217  /**
1218   * Compute the cost of a potential cluster configuration based upon how evenly
1219   * distributed tables are.
1220   */
1221  static class TableSkewCostFunction extends CostFunction {
1222
1223    private static final String TABLE_SKEW_COST_KEY =
1224        "hbase.master.balancer.stochastic.tableSkewCost";
1225    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1226
1227    TableSkewCostFunction(Configuration conf) {
1228      super(conf);
1229      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1230    }
1231
1232    @Override
1233    double cost() {
1234      double max = cluster.numRegions;
1235      double min = ((double) cluster.numRegions) / cluster.numServers;
1236      double value = 0;
1237
1238      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1239        value += cluster.numMaxRegionsPerTable[i];
1240      }
1241
1242      return scale(min, max, value);
1243    }
1244  }
1245
1246  /**
1247   * Compute a cost of a potential cluster configuration based upon where
1248   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1249   */
1250  static abstract class LocalityBasedCostFunction extends CostFunction {
1251
1252    private final LocalityType type;
1253
1254    private double bestLocality; // best case locality across cluster weighted by local data size
1255    private double locality; // current locality across cluster weighted by local data size
1256
1257    private MasterServices services;
1258
1259    LocalityBasedCostFunction(Configuration conf,
1260                              MasterServices srv,
1261                              LocalityType type,
1262                              String localityCostKey,
1263                              float defaultLocalityCost) {
1264      super(conf);
1265      this.type = type;
1266      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1267      this.services = srv;
1268      this.locality = 0.0;
1269      this.bestLocality = 0.0;
1270    }
1271
1272    /**
1273     * Maps region to the current entity (server or rack) on which it is stored
1274     */
1275    abstract int regionIndexToEntityIndex(int region);
1276
1277    public void setServices(MasterServices srvc) {
1278      this.services = srvc;
1279    }
1280
1281    @Override
1282    void init(Cluster cluster) {
1283      super.init(cluster);
1284      locality = 0.0;
1285      bestLocality = 0.0;
1286
1287      // If no master, no computation will work, so assume 0 cost
1288      if (this.services == null) {
1289        return;
1290      }
1291
1292      for (int region = 0; region < cluster.numRegions; region++) {
1293        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1294        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1295      }
1296
1297      // We normalize locality to be a score between 0 and 1.0 representing how good it
1298      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1299      // (and the cost is 0)
1300      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1301    }
1302
1303    @Override
1304    protected void regionMoved(int region, int oldServer, int newServer) {
1305      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1306      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1307      if (this.services == null) {
1308        return;
1309      }
1310      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1311      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1312      locality += normalizedDelta;
1313    }
1314
1315    @Override
1316    double cost() {
1317      return 1 - locality;
1318    }
1319
1320    private int getMostLocalEntityForRegion(int region) {
1321      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1322    }
1323
1324    private double getWeightedLocality(int region, int entity) {
1325      return cluster.getOrComputeWeightedLocality(region, entity, type);
1326    }
1327
1328  }
1329
1330  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1331
1332    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1333    private static final float DEFAULT_LOCALITY_COST = 25;
1334
1335    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1336      super(
1337          conf,
1338          srv,
1339          LocalityType.SERVER,
1340          LOCALITY_COST_KEY,
1341          DEFAULT_LOCALITY_COST
1342      );
1343    }
1344
1345    @Override
1346    int regionIndexToEntityIndex(int region) {
1347      return cluster.regionIndexToServerIndex[region];
1348    }
1349  }
1350
1351  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1352
1353    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1354    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1355
1356    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1357      super(
1358          conf,
1359          services,
1360          LocalityType.RACK,
1361          RACK_LOCALITY_COST_KEY,
1362          DEFAULT_RACK_LOCALITY_COST
1363      );
1364    }
1365
1366    @Override
1367    int regionIndexToEntityIndex(int region) {
1368      return cluster.getRackForRegion(region);
1369    }
1370  }
1371
1372  /**
1373   * Base class the allows writing costs functions from rolling average of some
1374   * number from RegionLoad.
1375   */
1376  abstract static class CostFromRegionLoadFunction extends CostFunction {
1377
1378    private ClusterMetrics clusterStatus = null;
1379    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1380    private double[] stats = null;
1381    CostFromRegionLoadFunction(Configuration conf) {
1382      super(conf);
1383    }
1384
1385    void setClusterMetrics(ClusterMetrics status) {
1386      this.clusterStatus = status;
1387    }
1388
1389    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1390      this.loads = l;
1391    }
1392
1393    @Override
1394    double cost() {
1395      if (clusterStatus == null || loads == null) {
1396        return 0;
1397      }
1398
1399      if (stats == null || stats.length != cluster.numServers) {
1400        stats = new double[cluster.numServers];
1401      }
1402
1403      for (int i =0; i < stats.length; i++) {
1404        //Cost this server has from RegionLoad
1405        long cost = 0;
1406
1407        // for every region on this server get the rl
1408        for(int regionIndex:cluster.regionsPerServer[i]) {
1409          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1410
1411          // Now if we found a region load get the type of cost that was requested.
1412          if (regionLoadList != null) {
1413            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1414          }
1415        }
1416
1417        // Add the total cost to the stats.
1418        stats[i] = cost;
1419      }
1420
1421      // Now return the scaled cost from data held in the stats object.
1422      return costFromArray(stats);
1423    }
1424
1425    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1426      double cost = 0;
1427      for (BalancerRegionLoad rl : regionLoadList) {
1428        cost += getCostFromRl(rl);
1429      }
1430      return cost / regionLoadList.size();
1431    }
1432
1433    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1434  }
1435
1436  /**
1437   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1438   * We do not compare about the actual rate in requests per second but rather the rate relative
1439   * to the rest of the regions.
1440   */
1441  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1442
1443    CostFromRegionLoadAsRateFunction(Configuration conf) {
1444      super(conf);
1445    }
1446
1447    @Override
1448    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1449      double cost = 0;
1450      double previous = 0;
1451      boolean isFirst = true;
1452      for (BalancerRegionLoad rl : regionLoadList) {
1453        double current = getCostFromRl(rl);
1454        if (isFirst) {
1455          isFirst = false;
1456        } else {
1457          cost += current - previous;
1458        }
1459        previous = current;
1460      }
1461      return Math.max(0, cost / (regionLoadList.size() - 1));
1462    }
1463  }
1464
1465  /**
1466   * Compute the cost of total number of read requests  The more unbalanced the higher the
1467   * computed cost will be.  This uses a rolling average of regionload.
1468   */
1469
1470  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1471
1472    private static final String READ_REQUEST_COST_KEY =
1473        "hbase.master.balancer.stochastic.readRequestCost";
1474    private static final float DEFAULT_READ_REQUEST_COST = 5;
1475
1476    ReadRequestCostFunction(Configuration conf) {
1477      super(conf);
1478      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1479    }
1480
1481    @Override
1482    protected double getCostFromRl(BalancerRegionLoad rl) {
1483      return rl.getReadRequestsCount();
1484    }
1485  }
1486
1487  /**
1488   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1489   * computed cost will be.  This uses a rolling average of regionload.
1490   */
1491  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1492
1493    private static final String WRITE_REQUEST_COST_KEY =
1494        "hbase.master.balancer.stochastic.writeRequestCost";
1495    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1496
1497    WriteRequestCostFunction(Configuration conf) {
1498      super(conf);
1499      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1500    }
1501
1502    @Override
1503    protected double getCostFromRl(BalancerRegionLoad rl) {
1504      return rl.getWriteRequestsCount();
1505    }
1506  }
1507
1508  /**
1509   * A cost function for region replicas. We give a very high cost to hosting
1510   * replicas of the same region in the same host. We do not prevent the case
1511   * though, since if numReplicas > numRegionServers, we still want to keep the
1512   * replica open.
1513   */
1514  static class RegionReplicaHostCostFunction extends CostFunction {
1515    private static final String REGION_REPLICA_HOST_COST_KEY =
1516        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1517    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1518
1519    long maxCost = 0;
1520    long[] costsPerGroup; // group is either server, host or rack
1521    int[][] primariesOfRegionsPerGroup;
1522
1523    public RegionReplicaHostCostFunction(Configuration conf) {
1524      super(conf);
1525      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1526        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1527    }
1528
1529    @Override
1530    void init(Cluster cluster) {
1531      super.init(cluster);
1532      // max cost is the case where every region replica is hosted together regardless of host
1533      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1534      costsPerGroup = new long[cluster.numHosts];
1535      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1536          ? cluster.primariesOfRegionsPerHost
1537          : cluster.primariesOfRegionsPerServer;
1538      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1539        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1540      }
1541    }
1542
1543    long getMaxCost(Cluster cluster) {
1544      if (!cluster.hasRegionReplicas) {
1545        return 0; // short circuit
1546      }
1547      // max cost is the case where every region replica is hosted together regardless of host
1548      int[] primariesOfRegions = new int[cluster.numRegions];
1549      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1550          cluster.regions.length);
1551
1552      Arrays.sort(primariesOfRegions);
1553
1554      // compute numReplicas from the sorted array
1555      return costPerGroup(primariesOfRegions);
1556    }
1557
1558    @Override
1559    boolean isNeeded() {
1560      return cluster.hasRegionReplicas;
1561    }
1562
1563    @Override
1564    double cost() {
1565      if (maxCost <= 0) {
1566        return 0;
1567      }
1568
1569      long totalCost = 0;
1570      for (int i = 0 ; i < costsPerGroup.length; i++) {
1571        totalCost += costsPerGroup[i];
1572      }
1573      return scale(0, maxCost, totalCost);
1574    }
1575
1576    /**
1577     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1578     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1579     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1580     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1581     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1582     * @return a sum of numReplicas-1 squared for each primary region in the group.
1583     */
1584    protected long costPerGroup(int[] primariesOfRegions) {
1585      long cost = 0;
1586      int currentPrimary = -1;
1587      int currentPrimaryIndex = -1;
1588      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1589      // sharing the same primary will have consecutive numbers in the array.
1590      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1591        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1592        if (primary != currentPrimary) { // we see a new primary
1593          int numReplicas = j - currentPrimaryIndex;
1594          // square the cost
1595          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1596            cost += (numReplicas - 1) * (numReplicas - 1);
1597          }
1598          currentPrimary = primary;
1599          currentPrimaryIndex = j;
1600        }
1601      }
1602
1603      return cost;
1604    }
1605
1606    @Override
1607    protected void regionMoved(int region, int oldServer, int newServer) {
1608      if (maxCost <= 0) {
1609        return; // no need to compute
1610      }
1611      if (cluster.multiServersPerHost) {
1612        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1613        int newHost = cluster.serverIndexToHostIndex[newServer];
1614        if (newHost != oldHost) {
1615          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1616          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1617        }
1618      } else {
1619        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1620        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1621      }
1622    }
1623  }
1624
1625  /**
1626   * A cost function for region replicas for the rack distribution. We give a relatively high
1627   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1628   * though.
1629   */
1630  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1631    private static final String REGION_REPLICA_RACK_COST_KEY =
1632        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1633    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1634
1635    public RegionReplicaRackCostFunction(Configuration conf) {
1636      super(conf);
1637      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1638        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1639    }
1640
1641    @Override
1642    void init(Cluster cluster) {
1643      this.cluster = cluster;
1644      if (cluster.numRacks <= 1) {
1645        maxCost = 0;
1646        return; // disabled for 1 rack
1647      }
1648      // max cost is the case where every region replica is hosted together regardless of rack
1649      maxCost = getMaxCost(cluster);
1650      costsPerGroup = new long[cluster.numRacks];
1651      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1652        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1653      }
1654    }
1655
1656    @Override
1657    protected void regionMoved(int region, int oldServer, int newServer) {
1658      if (maxCost <= 0) {
1659        return; // no need to compute
1660      }
1661      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1662      int newRack = cluster.serverIndexToRackIndex[newServer];
1663      if (newRack != oldRack) {
1664        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1665        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1666      }
1667    }
1668  }
1669
1670  /**
1671   * Compute the cost of total memstore size.  The more unbalanced the higher the
1672   * computed cost will be.  This uses a rolling average of regionload.
1673   */
1674  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1675
1676    private static final String MEMSTORE_SIZE_COST_KEY =
1677        "hbase.master.balancer.stochastic.memstoreSizeCost";
1678    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1679
1680    MemStoreSizeCostFunction(Configuration conf) {
1681      super(conf);
1682      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1683    }
1684
1685    @Override
1686    protected double getCostFromRl(BalancerRegionLoad rl) {
1687      return rl.getMemStoreSizeMB();
1688    }
1689  }
1690  /**
1691   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1692   * computed cost will be.  This uses a rolling average of regionload.
1693   */
1694  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1695
1696    private static final String STOREFILE_SIZE_COST_KEY =
1697        "hbase.master.balancer.stochastic.storefileSizeCost";
1698    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1699
1700    StoreFileCostFunction(Configuration conf) {
1701      super(conf);
1702      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1703    }
1704
1705    @Override
1706    protected double getCostFromRl(BalancerRegionLoad rl) {
1707      return rl.getStorefileSizeMB();
1708    }
1709  }
1710
1711  /**
1712   * A helper function to compose the attribute name from tablename and costfunction name
1713   */
1714  public static String composeAttributeName(String tableName, String costFunctionName) {
1715    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1716  }
1717}