001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
055import org.apache.hbase.thirdparty.com.google.common.base.Optional;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058
059/**
060 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
061 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
062 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
063 * <ul>
064 * <li>Region Load</li>
065 * <li>Table Load</li>
066 * <li>Data Locality</li>
067 * <li>Memstore Sizes</li>
068 * <li>Storefile Sizes</li>
069 * </ul>
070 *
071 *
072 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
073 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
074 * scaled by their respective multipliers:</p>
075 *
076 * <ul>
077 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
078 *   <li>hbase.master.balancer.stochastic.moveCost</li>
079 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.localityCost</li>
081 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
082 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
083 * </ul>
084 *
085 * <p>In addition to the above configurations, the balancer can be tuned by the following
086 * configuration values:</p>
087 * <ul>
088 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
089 *   controls what the max number of regions that can be moved in a single invocation of this
090 *   balancer.</li>
091 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
092 *   regions is multiplied to try and get the number of times the balancer will
093 *   mutate all servers.</li>
094 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
095 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
096 *   value and the above computation.</li>
097 * </ul>
098 *
099 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
100 * so that the balancer gets the full picture of all loads on the cluster.</p>
101 */
102@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
103@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
104  justification="Complaint is about costFunctions not being synchronized; not end of the world")
105public class StochasticLoadBalancer extends BaseLoadBalancer {
106
107  protected static final String STEPS_PER_REGION_KEY =
108      "hbase.master.balancer.stochastic.stepsPerRegion";
109  protected static final String MAX_STEPS_KEY =
110      "hbase.master.balancer.stochastic.maxSteps";
111  protected static final String RUN_MAX_STEPS_KEY =
112      "hbase.master.balancer.stochastic.runMaxSteps";
113  protected static final String MAX_RUNNING_TIME_KEY =
114      "hbase.master.balancer.stochastic.maxRunningTime";
115  protected static final String KEEP_REGION_LOADS =
116      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
117  private static final String TABLE_FUNCTION_SEP = "_";
118  protected static final String MIN_COST_NEED_BALANCE_KEY =
119      "hbase.master.balancer.stochastic.minCostNeedBalance";
120
121  protected static final Random RANDOM = new Random(System.currentTimeMillis());
122  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
123
124  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
125
126  // values are defaults
127  private int maxSteps = 1000000;
128  private boolean runMaxSteps = false;
129  private int stepsPerRegion = 800;
130  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
131  private int numRegionLoadsToRemember = 15;
132  private float minCostNeedBalance = 0.05f;
133
134  private List<CandidateGenerator> candidateGenerators;
135  private CostFromRegionLoadFunction[] regionLoadFunctions;
136  private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
137
138  // to save and report costs to JMX
139  private Double curOverallCost = 0d;
140  private Double[] tempFunctionCosts;
141  private Double[] curFunctionCosts;
142
143  // Keep locality based picker and cost function to alert them
144  // when new services are offered
145  private LocalityBasedCandidateGenerator localityCandidateGenerator;
146  private ServerLocalityCostFunction localityCost;
147  private RackLocalityCostFunction rackLocalityCost;
148  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
149  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
150  private boolean isByTable = false;
151  private TableName tableName = null;
152
153  /**
154   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
155   * default MetricsBalancer
156   */
157  public StochasticLoadBalancer() {
158    super(new MetricsStochasticBalancer());
159  }
160
161  @Override
162  public void onConfigurationChange(Configuration conf) {
163    setConf(conf);
164  }
165
166  @Override
167  public synchronized void setConf(Configuration conf) {
168    super.setConf(conf);
169    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
170    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
171    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
172    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
173
174    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
175    isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
176    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
177    if (localityCandidateGenerator == null) {
178      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
179    }
180    localityCost = new ServerLocalityCostFunction(conf, services);
181    rackLocalityCost = new RackLocalityCostFunction(conf, services);
182
183    if (this.candidateGenerators == null) {
184      candidateGenerators = Lists.newArrayList();
185      candidateGenerators.add(new RandomCandidateGenerator());
186      candidateGenerators.add(new LoadCandidateGenerator());
187      candidateGenerators.add(localityCandidateGenerator);
188      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
189    }
190    regionLoadFunctions = new CostFromRegionLoadFunction[] {
191      new ReadRequestCostFunction(conf),
192      new CPRequestCostFunction(conf),
193      new WriteRequestCostFunction(conf),
194      new MemStoreSizeCostFunction(conf),
195      new StoreFileCostFunction(conf)
196    };
197    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
198    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
199    costFunctions = new CostFunction[]{
200      new RegionCountSkewCostFunction(conf),
201      new PrimaryRegionCountSkewCostFunction(conf),
202      new MoveCostFunction(conf),
203      localityCost,
204      rackLocalityCost,
205      new TableSkewCostFunction(conf),
206      regionReplicaHostCostFunction,
207      regionReplicaRackCostFunction,
208      regionLoadFunctions[0],
209      regionLoadFunctions[1],
210      regionLoadFunctions[2],
211      regionLoadFunctions[3],
212      regionLoadFunctions[4]
213    };
214    curFunctionCosts= new Double[costFunctions.length];
215    tempFunctionCosts= new Double[costFunctions.length];
216    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
217        ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
218  }
219
220  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
221    this.candidateGenerators = customCandidateGenerators;
222  }
223
224  @Override
225  protected void setSlop(Configuration conf) {
226    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
227  }
228
229  @Override
230  public synchronized void setClusterMetrics(ClusterMetrics st) {
231    super.setClusterMetrics(st);
232    updateRegionLoad();
233    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
234      cost.setClusterMetrics(st);
235    }
236
237    // update metrics size
238    try {
239      // by-table or ensemble mode
240      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
241      int functionsCount = getCostFunctionNames().length;
242
243      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
244    } catch (Exception e) {
245      LOG.error("failed to get the size of all tables", e);
246    }
247  }
248
249  /**
250   * Update the number of metrics that are reported to JMX
251   */
252  public void updateMetricsSize(int size) {
253    if (metricsBalancer instanceof MetricsStochasticBalancer) {
254        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
255    }
256  }
257
258  @Override
259  public synchronized void setMasterServices(MasterServices masterServices) {
260    super.setMasterServices(masterServices);
261    this.localityCost.setServices(masterServices);
262    this.rackLocalityCost.setServices(masterServices);
263    this.localityCandidateGenerator.setServices(masterServices);
264  }
265
266  @Override
267  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
268    regionReplicaHostCostFunction.init(c);
269    if (regionReplicaHostCostFunction.cost() > 0) return true;
270    regionReplicaRackCostFunction.init(c);
271    if (regionReplicaRackCostFunction.cost() > 0) return true;
272    return false;
273  }
274
275  @Override
276  protected boolean needsBalance(Cluster cluster) {
277    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
278    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
279      if (LOG.isDebugEnabled()) {
280        LOG.debug("Not running balancer because only " + cs.getNumServers()
281            + " active regionserver(s)");
282      }
283      return false;
284    }
285    if (areSomeRegionReplicasColocated(cluster)) {
286      return true;
287    }
288
289    double total = 0.0;
290    float sumMultiplier = 0.0f;
291    for (CostFunction c : costFunctions) {
292      float multiplier = c.getMultiplier();
293      if (multiplier <= 0) {
294        continue;
295      }
296      if (!c.isNeeded()) {
297        LOG.debug("{} not needed", c.getClass().getSimpleName());
298        continue;
299      }
300      sumMultiplier += multiplier;
301      total += c.cost() * multiplier;
302    }
303
304    if (total <= 0 || sumMultiplier <= 0
305        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
306      if (LOG.isTraceEnabled()) {
307        LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total
308          + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
309          + minCostNeedBalance);
310      }
311      return false;
312    }
313    return true;
314  }
315
316  @Override
317  public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
318    List<RegionInfo>> clusterState) {
319    this.tableName = tableName;
320    return balanceCluster(clusterState);
321  }
322
323  @VisibleForTesting
324  Cluster.Action nextAction(Cluster cluster) {
325    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
326            .generate(cluster);
327  }
328
329  /**
330   * Given the cluster state this will try and approach an optimal balance. This
331   * should always approach the optimal state given enough steps.
332   */
333  @Override
334  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
335    List<RegionInfo>> clusterState) {
336    List<RegionPlan> plans = balanceMasterRegions(clusterState);
337    if (plans != null || clusterState == null || clusterState.size() <= 1) {
338      return plans;
339    }
340
341    if (masterServerName != null && clusterState.containsKey(masterServerName)) {
342      if (clusterState.size() <= 2) {
343        return null;
344      }
345      clusterState = new HashMap<>(clusterState);
346      clusterState.remove(masterServerName);
347    }
348
349    // On clusters with lots of HFileLinks or lots of reference files,
350    // instantiating the storefile infos can be quite expensive.
351    // Allow turning this feature off if the locality cost is not going to
352    // be used in any computations.
353    RegionLocationFinder finder = null;
354    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
355        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
356      finder = this.regionFinder;
357    }
358
359    //The clusterState that is given to this method contains the state
360    //of all the regions in the table(s) (that's true today)
361    // Keep track of servers to iterate through them.
362    Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
363
364    long startTime = EnvironmentEdgeManager.currentTime();
365
366    initCosts(cluster);
367
368    if (!needsBalance(cluster)) {
369      return null;
370    }
371
372    double currentCost = computeCost(cluster, Double.MAX_VALUE);
373    curOverallCost = currentCost;
374    for (int i = 0; i < this.curFunctionCosts.length; i++) {
375      curFunctionCosts[i] = tempFunctionCosts[i];
376    }
377    double initCost = currentCost;
378    double newCost = currentCost;
379
380    long computedMaxSteps;
381    if (runMaxSteps) {
382      computedMaxSteps = Math.max(this.maxSteps,
383          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
384    } else {
385      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
386          (long)cluster.numServers;
387      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
388      if (calculatedMaxSteps > maxSteps) {
389        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
390            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
391            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
392            + "(This config change does not require service restart)", calculatedMaxSteps,
393            maxSteps);
394      }
395    }
396    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
397        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
398
399    // Perform a stochastic walk to see if we can get a good fit.
400    long step;
401
402    for (step = 0; step < computedMaxSteps; step++) {
403      Cluster.Action action = nextAction(cluster);
404
405      if (action.type == Type.NULL) {
406        continue;
407      }
408
409      cluster.doAction(action);
410      updateCostsWithAction(cluster, action);
411
412      newCost = computeCost(cluster, currentCost);
413
414      // Should this be kept?
415      if (newCost < currentCost) {
416        currentCost = newCost;
417
418        // save for JMX
419        curOverallCost = currentCost;
420        for (int i = 0; i < this.curFunctionCosts.length; i++) {
421          curFunctionCosts[i] = tempFunctionCosts[i];
422        }
423      } else {
424        // Put things back the way they were before.
425        // TODO: undo by remembering old values
426        Action undoAction = action.undoAction();
427        cluster.doAction(undoAction);
428        updateCostsWithAction(cluster, undoAction);
429      }
430
431      if (EnvironmentEdgeManager.currentTime() - startTime >
432          maxRunningTime) {
433        break;
434      }
435    }
436    long endTime = EnvironmentEdgeManager.currentTime();
437
438    metricsBalancer.balanceCluster(endTime - startTime);
439
440    // update costs metrics
441    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
442    if (initCost > currentCost) {
443      plans = createRegionPlans(cluster);
444      LOG.info("Finished computing new load balance plan. Computation took {}" +
445        " to try {} different iterations.  Found a solution that moves " +
446        "{} regions; Going from a computed cost of {}" +
447        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
448        step, plans.size(), initCost, currentCost);
449      return plans;
450    }
451    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
452      "{}, and did not find anything with a computed cost less than {}", step,
453      java.time.Duration.ofMillis(endTime - startTime), initCost);
454    return null;
455  }
456
457  /**
458   * update costs to JMX
459   */
460  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
461    if (tableName == null) return;
462
463    // check if the metricsBalancer is MetricsStochasticBalancer before casting
464    if (metricsBalancer instanceof MetricsStochasticBalancer) {
465      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
466      // overall cost
467      balancer.updateStochasticCost(tableName.getNameAsString(),
468        "Overall", "Overall cost", overall);
469
470      // each cost function
471      for (int i = 0; i < costFunctions.length; i++) {
472        CostFunction costFunction = costFunctions[i];
473        String costFunctionName = costFunction.getClass().getSimpleName();
474        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
475        // TODO: cost function may need a specific description
476        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
477          "The percent of " + costFunctionName, costPercent);
478      }
479    }
480  }
481
482  private String functionCost() {
483    StringBuilder builder = new StringBuilder();
484    for (CostFunction c:costFunctions) {
485      builder.append(c.getClass().getSimpleName());
486      builder.append(" : (");
487      builder.append(c.getMultiplier());
488      builder.append(", ");
489      builder.append(c.cost());
490      builder.append("); ");
491    }
492    return builder.toString();
493  }
494
495  /**
496   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
497   * state.
498   *
499   * @param cluster The state of the cluster
500   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
501   */
502  private List<RegionPlan> createRegionPlans(Cluster cluster) {
503    List<RegionPlan> plans = new LinkedList<>();
504    for (int regionIndex = 0;
505         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
506      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
507      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
508
509      if (initialServerIndex != newServerIndex) {
510        RegionInfo region = cluster.regions[regionIndex];
511        ServerName initialServer = cluster.servers[initialServerIndex];
512        ServerName newServer = cluster.servers[newServerIndex];
513
514        if (LOG.isTraceEnabled()) {
515          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
516              + initialServer.getHostname() + " to " + newServer.getHostname());
517        }
518        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
519        plans.add(rp);
520      }
521    }
522    return plans;
523  }
524
525  /**
526   * Store the current region loads.
527   */
528  private synchronized void updateRegionLoad() {
529    // We create a new hashmap so that regions that are no longer there are removed.
530    // However we temporarily need the old loads so we can use them to keep the rolling average.
531    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
532    loads = new HashMap<>();
533
534    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
535      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
536        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
537        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
538        if (rLoads == null) {
539          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
540        } else if (rLoads.size() >= numRegionLoadsToRemember) {
541          rLoads.remove();
542        }
543        rLoads.add(new BalancerRegionLoad(rm));
544        loads.put(regionNameAsString, rLoads);
545      });
546    });
547
548    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
549      cost.setLoads(loads);
550    }
551  }
552
553  protected void initCosts(Cluster cluster) {
554    for (CostFunction c:costFunctions) {
555      c.init(cluster);
556    }
557  }
558
559  protected void updateCostsWithAction(Cluster cluster, Action action) {
560    for (CostFunction c : costFunctions) {
561      c.postAction(action);
562    }
563  }
564
565  /**
566   * Get the names of the cost functions
567   */
568  public String[] getCostFunctionNames() {
569    if (costFunctions == null) return null;
570    String[] ret = new String[costFunctions.length];
571    for (int i = 0; i < costFunctions.length; i++) {
572      CostFunction c = costFunctions[i];
573      ret[i] = c.getClass().getSimpleName();
574    }
575
576    return ret;
577  }
578
579  /**
580   * This is the main cost function.  It will compute a cost associated with a proposed cluster
581   * state.  All different costs will be combined with their multipliers to produce a double cost.
582   *
583   * @param cluster The state of the cluster
584   * @param previousCost the previous cost. This is used as an early out.
585   * @return a double of a cost associated with the proposed cluster state.  This cost is an
586   *         aggregate of all individual cost functions.
587   */
588  protected double computeCost(Cluster cluster, double previousCost) {
589    double total = 0;
590
591    for (int i = 0; i < costFunctions.length; i++) {
592      CostFunction c = costFunctions[i];
593      this.tempFunctionCosts[i] = 0.0;
594
595      if (c.getMultiplier() <= 0) {
596        continue;
597      }
598
599      Float multiplier = c.getMultiplier();
600      Double cost = c.cost();
601
602      this.tempFunctionCosts[i] = multiplier*cost;
603      total += this.tempFunctionCosts[i];
604
605      if (total > previousCost) {
606        break;
607      }
608    }
609
610    return total;
611  }
612
613  /** Generates a candidate action to be applied to the cluster for cost function search */
614  abstract static class CandidateGenerator {
615    abstract Cluster.Action generate(Cluster cluster);
616
617    /**
618     * From a list of regions pick a random one. Null can be returned which
619     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
620     * rather than swap.
621     *
622     * @param cluster        The state of the cluster
623     * @param server         index of the server
624     * @param chanceOfNoSwap Chance that this will decide to try a move rather
625     *                       than a swap.
626     * @return a random {@link RegionInfo} or null if an asymmetrical move is
627     *         suggested.
628     */
629    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
630      // Check to see if this is just a move.
631      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
632        // signal a move only.
633        return -1;
634      }
635      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
636      return cluster.regionsPerServer[server][rand];
637
638    }
639    protected int pickRandomServer(Cluster cluster) {
640      if (cluster.numServers < 1) {
641        return -1;
642      }
643
644      return RANDOM.nextInt(cluster.numServers);
645    }
646
647    protected int pickRandomRack(Cluster cluster) {
648      if (cluster.numRacks < 1) {
649        return -1;
650      }
651
652      return RANDOM.nextInt(cluster.numRacks);
653    }
654
655    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
656      if (cluster.numServers < 2) {
657        return -1;
658      }
659      while (true) {
660        int otherServerIndex = pickRandomServer(cluster);
661        if (otherServerIndex != serverIndex) {
662          return otherServerIndex;
663        }
664      }
665    }
666
667    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
668      if (cluster.numRacks < 2) {
669        return -1;
670      }
671      while (true) {
672        int otherRackIndex = pickRandomRack(cluster);
673        if (otherRackIndex != rackIndex) {
674          return otherRackIndex;
675        }
676      }
677    }
678
679    protected Cluster.Action pickRandomRegions(Cluster cluster,
680                                                       int thisServer,
681                                                       int otherServer) {
682      if (thisServer < 0 || otherServer < 0) {
683        return Cluster.NullAction;
684      }
685
686      // Decide who is most likely to need another region
687      int thisRegionCount = cluster.getNumRegions(thisServer);
688      int otherRegionCount = cluster.getNumRegions(otherServer);
689
690      // Assign the chance based upon the above
691      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
692      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
693
694      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
695      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
696
697      return getAction(thisServer, thisRegion, otherServer, otherRegion);
698    }
699
700    protected Cluster.Action getAction(int fromServer, int fromRegion,
701        int toServer, int toRegion) {
702      if (fromServer < 0 || toServer < 0) {
703        return Cluster.NullAction;
704      }
705      if (fromRegion > 0 && toRegion > 0) {
706        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
707          toServer, toRegion);
708      } else if (fromRegion > 0) {
709        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
710      } else if (toRegion > 0) {
711        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
712      } else {
713        return Cluster.NullAction;
714      }
715    }
716
717    /**
718     * Returns a random iteration order of indexes of an array with size length
719     */
720    protected List<Integer> getRandomIterationOrder(int length) {
721      ArrayList<Integer> order = new ArrayList<>(length);
722      for (int i = 0; i < length; i++) {
723        order.add(i);
724      }
725      Collections.shuffle(order);
726      return order;
727    }
728  }
729
730  static class RandomCandidateGenerator extends CandidateGenerator {
731
732    @Override
733    Cluster.Action generate(Cluster cluster) {
734
735      int thisServer = pickRandomServer(cluster);
736
737      // Pick the other server
738      int otherServer = pickOtherRandomServer(cluster, thisServer);
739
740      return pickRandomRegions(cluster, thisServer, otherServer);
741    }
742  }
743
744  static class LoadCandidateGenerator extends CandidateGenerator {
745
746    @Override
747    Cluster.Action generate(Cluster cluster) {
748      cluster.sortServersByRegionCount();
749      int thisServer = pickMostLoadedServer(cluster, -1);
750      int otherServer = pickLeastLoadedServer(cluster, thisServer);
751
752      return pickRandomRegions(cluster, thisServer, otherServer);
753    }
754
755    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
756      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
757
758      int index = 0;
759      while (servers[index] == null || servers[index] == thisServer) {
760        index++;
761        if (index == servers.length) {
762          return -1;
763        }
764      }
765      return servers[index];
766    }
767
768    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
769      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
770
771      int index = servers.length - 1;
772      while (servers[index] == null || servers[index] == thisServer) {
773        index--;
774        if (index < 0) {
775          return -1;
776        }
777      }
778      return servers[index];
779    }
780  }
781
782  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
783
784    private MasterServices masterServices;
785
786    LocalityBasedCandidateGenerator(MasterServices masterServices) {
787      this.masterServices = masterServices;
788    }
789
790    @Override
791    Cluster.Action generate(Cluster cluster) {
792      if (this.masterServices == null) {
793        int thisServer = pickRandomServer(cluster);
794        // Pick the other server
795        int otherServer = pickOtherRandomServer(cluster, thisServer);
796        return pickRandomRegions(cluster, thisServer, otherServer);
797      }
798
799      // Randomly iterate through regions until you find one that is not on ideal host
800      for (int region : getRandomIterationOrder(cluster.numRegions)) {
801        int currentServer = cluster.regionIndexToServerIndex[region];
802        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
803          Optional<Action> potential = tryMoveOrSwap(
804              cluster,
805              currentServer,
806              region,
807              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
808          );
809          if (potential.isPresent()) {
810            return potential.get();
811          }
812        }
813      }
814      return Cluster.NullAction;
815    }
816
817    /**
818     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
819     * Returns empty optional if no move can be found
820     */
821    private Optional<Action> tryMoveOrSwap(Cluster cluster,
822                                           int fromServer,
823                                           int fromRegion,
824                                           int toServer) {
825      // Try move first. We know apriori fromRegion has the highest locality on toServer
826      if (cluster.serverHasTooFewRegions(toServer)) {
827        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
828      }
829
830      // Compare locality gain/loss from swapping fromRegion with regions on toServer
831      double fromRegionLocalityDelta =
832          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
833      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
834        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
835        double toRegionLocalityDelta =
836            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
837        // If locality would remain neutral or improve, attempt the swap
838        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
839          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
840        }
841      }
842
843      return Optional.absent();
844    }
845
846    private double getWeightedLocality(Cluster cluster, int region, int server) {
847      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
848    }
849
850    void setServices(MasterServices services) {
851      this.masterServices = services;
852    }
853  }
854
855  /**
856   * Generates candidates which moves the replicas out of the region server for
857   * co-hosted region replicas
858   */
859  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
860
861    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
862
863    /**
864     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
865     * (a group is a server, host or rack)
866     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
867     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
868     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
869     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
870     * @return a regionIndex for the selected primary or -1 if there is no co-locating
871     */
872    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
873        , int[] regionIndexToPrimaryIndex) {
874      int currentPrimary = -1;
875      int currentPrimaryIndex = -1;
876      int selectedPrimaryIndex = -1;
877      double currentLargestRandom = -1;
878      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
879      // ids for the regions hosted in server, a consecutive repetition means that replicas
880      // are co-hosted
881      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
882        int primary = j < primariesOfRegionsPerGroup.length
883            ? primariesOfRegionsPerGroup[j] : -1;
884        if (primary != currentPrimary) { // check for whether we see a new primary
885          int numReplicas = j - currentPrimaryIndex;
886          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
887            // decide to select this primary region id or not
888            double currentRandom = RANDOM.nextDouble();
889            // we don't know how many region replicas are co-hosted, we will randomly select one
890            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
891            if (currentRandom > currentLargestRandom) {
892              selectedPrimaryIndex = currentPrimary;
893              currentLargestRandom = currentRandom;
894            }
895          }
896          currentPrimary = primary;
897          currentPrimaryIndex = j;
898        }
899      }
900
901      // we have found the primary id for the region to move. Now find the actual regionIndex
902      // with the given primary, prefer to move the secondary region.
903      for (int j = 0; j < regionsPerGroup.length; j++) {
904        int regionIndex = regionsPerGroup[j];
905        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
906          // always move the secondary, not the primary
907          if (selectedPrimaryIndex != regionIndex) {
908            return regionIndex;
909          }
910        }
911      }
912      return -1;
913    }
914
915    @Override
916    Cluster.Action generate(Cluster cluster) {
917      int serverIndex = pickRandomServer(cluster);
918      if (cluster.numServers <= 1 || serverIndex == -1) {
919        return Cluster.NullAction;
920      }
921
922      int regionIndex = selectCoHostedRegionPerGroup(
923        cluster.primariesOfRegionsPerServer[serverIndex],
924        cluster.regionsPerServer[serverIndex],
925        cluster.regionIndexToPrimaryIndex);
926
927      // if there are no pairs of region replicas co-hosted, default to random generator
928      if (regionIndex == -1) {
929        // default to randompicker
930        return randomGenerator.generate(cluster);
931      }
932
933      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
934      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
935      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
936    }
937  }
938
939  /**
940   * Generates candidates which moves the replicas out of the rack for
941   * co-hosted region replicas in the same rack
942   */
943  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
944    @Override
945    Cluster.Action generate(Cluster cluster) {
946      int rackIndex = pickRandomRack(cluster);
947      if (cluster.numRacks <= 1 || rackIndex == -1) {
948        return super.generate(cluster);
949      }
950
951      int regionIndex = selectCoHostedRegionPerGroup(
952        cluster.primariesOfRegionsPerRack[rackIndex],
953        cluster.regionsPerRack[rackIndex],
954        cluster.regionIndexToPrimaryIndex);
955
956      // if there are no pairs of region replicas co-hosted, default to random generator
957      if (regionIndex == -1) {
958        // default to randompicker
959        return randomGenerator.generate(cluster);
960      }
961
962      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
963      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
964
965      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
966      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
967      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
968      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
969    }
970  }
971
972  /**
973   * Base class of StochasticLoadBalancer's Cost Functions.
974   */
975  abstract static class CostFunction {
976
977    private float multiplier = 0;
978
979    protected Cluster cluster;
980
981    CostFunction(Configuration c) {
982    }
983
984    boolean isNeeded() {
985      return true;
986    }
987    float getMultiplier() {
988      return multiplier;
989    }
990
991    void setMultiplier(float m) {
992      this.multiplier = m;
993    }
994
995    /** Called once per LB invocation to give the cost function
996     * to initialize it's state, and perform any costly calculation.
997     */
998    void init(Cluster cluster) {
999      this.cluster = cluster;
1000    }
1001
1002    /** Called once per cluster Action to give the cost function
1003     * an opportunity to update it's state. postAction() is always
1004     * called at least once before cost() is called with the cluster
1005     * that this action is performed on. */
1006    void postAction(Action action) {
1007      switch (action.type) {
1008      case NULL: break;
1009      case ASSIGN_REGION:
1010        AssignRegionAction ar = (AssignRegionAction) action;
1011        regionMoved(ar.region, -1, ar.server);
1012        break;
1013      case MOVE_REGION:
1014        MoveRegionAction mra = (MoveRegionAction) action;
1015        regionMoved(mra.region, mra.fromServer, mra.toServer);
1016        break;
1017      case SWAP_REGIONS:
1018        SwapRegionsAction a = (SwapRegionsAction) action;
1019        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1020        regionMoved(a.toRegion, a.toServer, a.fromServer);
1021        break;
1022      default:
1023        throw new RuntimeException("Uknown action:" + action.type);
1024      }
1025    }
1026
1027    protected void regionMoved(int region, int oldServer, int newServer) {
1028    }
1029
1030    abstract double cost();
1031
1032    @SuppressWarnings("checkstyle:linelength")
1033    /**
1034     * Function to compute a scaled cost using
1035     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
1036     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1037     * possible is all of the elements in one region server and the rest having 0.
1038     *
1039     * @param stats the costs
1040     * @return a scaled set of costs.
1041     */
1042    protected double costFromArray(double[] stats) {
1043      double totalCost = 0;
1044      double total = getSum(stats);
1045
1046      double count = stats.length;
1047      double mean = total/count;
1048
1049      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1050      // a zero sum cost for this to make sense.
1051      double max = ((count - 1) * mean) + (total - mean);
1052
1053      // It's possible that there aren't enough regions to go around
1054      double min;
1055      if (count > total) {
1056        min = ((count - total) * mean) + ((1 - mean) * total);
1057      } else {
1058        // Some will have 1 more than everything else.
1059        int numHigh = (int) (total - (Math.floor(mean) * count));
1060        int numLow = (int) (count - numHigh);
1061
1062        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1063
1064      }
1065      min = Math.max(0, min);
1066      for (int i=0; i<stats.length; i++) {
1067        double n = stats[i];
1068        double diff = Math.abs(mean - n);
1069        totalCost += diff;
1070      }
1071
1072      double scaled =  scale(min, max, totalCost);
1073      return scaled;
1074    }
1075
1076    private double getSum(double[] stats) {
1077      double total = 0;
1078      for(double s:stats) {
1079        total += s;
1080      }
1081      return total;
1082    }
1083
1084    /**
1085     * Scale the value between 0 and 1.
1086     *
1087     * @param min   Min value
1088     * @param max   The Max value
1089     * @param value The value to be scaled.
1090     * @return The scaled value.
1091     */
1092    protected double scale(double min, double max, double value) {
1093      if (max <= min || value <= min) {
1094        return 0;
1095      }
1096      if ((max - min) == 0) return 0;
1097
1098      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1099    }
1100  }
1101
1102  /**
1103   * Given the starting state of the regions and a potential ending state
1104   * compute cost based upon the number of regions that have moved.
1105   */
1106  static class MoveCostFunction extends CostFunction {
1107    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1108    private static final String MAX_MOVES_PERCENT_KEY =
1109        "hbase.master.balancer.stochastic.maxMovePercent";
1110    private static final float DEFAULT_MOVE_COST = 7;
1111    private static final int DEFAULT_MAX_MOVES = 600;
1112    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1113
1114    private final float maxMovesPercent;
1115
1116    MoveCostFunction(Configuration conf) {
1117      super(conf);
1118
1119      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1120      // that large benefits are need to overcome the cost of a move.
1121      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1122      // What percent of the number of regions a single run of the balancer can move.
1123      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1124    }
1125
1126    @Override
1127    double cost() {
1128      // Try and size the max number of Moves, but always be prepared to move some.
1129      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1130          DEFAULT_MAX_MOVES);
1131
1132      double moveCost = cluster.numMovedRegions;
1133
1134      // Don't let this single balance move more than the max moves.
1135      // This allows better scaling to accurately represent the actual cost of a move.
1136      if (moveCost > maxMoves) {
1137        return 1000000;   // return a number much greater than any of the other cost
1138      }
1139
1140      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1141    }
1142  }
1143
1144  /**
1145   * Compute the cost of a potential cluster state from skew in number of
1146   * regions on a cluster.
1147   */
1148  static class RegionCountSkewCostFunction extends CostFunction {
1149    private static final String REGION_COUNT_SKEW_COST_KEY =
1150        "hbase.master.balancer.stochastic.regionCountCost";
1151    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1152
1153    private double[] stats = null;
1154
1155    RegionCountSkewCostFunction(Configuration conf) {
1156      super(conf);
1157      // Load multiplier should be the greatest as it is the most general way to balance data.
1158      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1159    }
1160
1161    @Override
1162    double cost() {
1163      if (stats == null || stats.length != cluster.numServers) {
1164        stats = new double[cluster.numServers];
1165      }
1166
1167      for (int i =0; i < cluster.numServers; i++) {
1168        stats[i] = cluster.regionsPerServer[i].length;
1169      }
1170
1171      return costFromArray(stats);
1172    }
1173  }
1174
1175  /**
1176   * Compute the cost of a potential cluster state from skew in number of
1177   * primary regions on a cluster.
1178   */
1179  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1180    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1181        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1182    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1183
1184    private double[] stats = null;
1185
1186    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1187      super(conf);
1188      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1189      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1190        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1191    }
1192
1193    @Override
1194    double cost() {
1195      if (!cluster.hasRegionReplicas) {
1196        return 0;
1197      }
1198      if (stats == null || stats.length != cluster.numServers) {
1199        stats = new double[cluster.numServers];
1200      }
1201
1202      for (int i = 0; i < cluster.numServers; i++) {
1203        stats[i] = 0;
1204        for (int regionIdx : cluster.regionsPerServer[i]) {
1205          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1206            stats[i]++;
1207          }
1208        }
1209      }
1210
1211      return costFromArray(stats);
1212    }
1213  }
1214
1215  /**
1216   * Compute the cost of a potential cluster configuration based upon how evenly
1217   * distributed tables are.
1218   */
1219  static class TableSkewCostFunction extends CostFunction {
1220
1221    private static final String TABLE_SKEW_COST_KEY =
1222        "hbase.master.balancer.stochastic.tableSkewCost";
1223    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1224
1225    TableSkewCostFunction(Configuration conf) {
1226      super(conf);
1227      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1228    }
1229
1230    @Override
1231    double cost() {
1232      double max = cluster.numRegions;
1233      double min = ((double) cluster.numRegions) / cluster.numServers;
1234      double value = 0;
1235
1236      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1237        value += cluster.numMaxRegionsPerTable[i];
1238      }
1239
1240      return scale(min, max, value);
1241    }
1242  }
1243
1244  /**
1245   * Compute a cost of a potential cluster configuration based upon where
1246   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1247   */
1248  static abstract class LocalityBasedCostFunction extends CostFunction {
1249
1250    private final LocalityType type;
1251
1252    private double bestLocality; // best case locality across cluster weighted by local data size
1253    private double locality; // current locality across cluster weighted by local data size
1254
1255    private MasterServices services;
1256
1257    LocalityBasedCostFunction(Configuration conf,
1258                              MasterServices srv,
1259                              LocalityType type,
1260                              String localityCostKey,
1261                              float defaultLocalityCost) {
1262      super(conf);
1263      this.type = type;
1264      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1265      this.services = srv;
1266      this.locality = 0.0;
1267      this.bestLocality = 0.0;
1268    }
1269
1270    /**
1271     * Maps region to the current entity (server or rack) on which it is stored
1272     */
1273    abstract int regionIndexToEntityIndex(int region);
1274
1275    public void setServices(MasterServices srvc) {
1276      this.services = srvc;
1277    }
1278
1279    @Override
1280    void init(Cluster cluster) {
1281      super.init(cluster);
1282      locality = 0.0;
1283      bestLocality = 0.0;
1284
1285      // If no master, no computation will work, so assume 0 cost
1286      if (this.services == null) {
1287        return;
1288      }
1289
1290      for (int region = 0; region < cluster.numRegions; region++) {
1291        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1292        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1293      }
1294
1295      // We normalize locality to be a score between 0 and 1.0 representing how good it
1296      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1297      // (and the cost is 0)
1298      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1299    }
1300
1301    @Override
1302    protected void regionMoved(int region, int oldServer, int newServer) {
1303      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1304      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1305      if (this.services == null) {
1306        return;
1307      }
1308      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1309      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1310      locality += normalizedDelta;
1311    }
1312
1313    @Override
1314    double cost() {
1315      return 1 - locality;
1316    }
1317
1318    private int getMostLocalEntityForRegion(int region) {
1319      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1320    }
1321
1322    private double getWeightedLocality(int region, int entity) {
1323      return cluster.getOrComputeWeightedLocality(region, entity, type);
1324    }
1325
1326  }
1327
1328  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1329
1330    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1331    private static final float DEFAULT_LOCALITY_COST = 25;
1332
1333    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1334      super(
1335          conf,
1336          srv,
1337          LocalityType.SERVER,
1338          LOCALITY_COST_KEY,
1339          DEFAULT_LOCALITY_COST
1340      );
1341    }
1342
1343    @Override
1344    int regionIndexToEntityIndex(int region) {
1345      return cluster.regionIndexToServerIndex[region];
1346    }
1347  }
1348
1349  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1350
1351    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1352    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1353
1354    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1355      super(
1356          conf,
1357          services,
1358          LocalityType.RACK,
1359          RACK_LOCALITY_COST_KEY,
1360          DEFAULT_RACK_LOCALITY_COST
1361      );
1362    }
1363
1364    @Override
1365    int regionIndexToEntityIndex(int region) {
1366      return cluster.getRackForRegion(region);
1367    }
1368  }
1369
1370  /**
1371   * Base class the allows writing costs functions from rolling average of some
1372   * number from RegionLoad.
1373   */
1374  abstract static class CostFromRegionLoadFunction extends CostFunction {
1375
1376    private ClusterMetrics clusterStatus = null;
1377    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1378    private double[] stats = null;
1379    CostFromRegionLoadFunction(Configuration conf) {
1380      super(conf);
1381    }
1382
1383    void setClusterMetrics(ClusterMetrics status) {
1384      this.clusterStatus = status;
1385    }
1386
1387    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1388      this.loads = l;
1389    }
1390
1391    @Override
1392    double cost() {
1393      if (clusterStatus == null || loads == null) {
1394        return 0;
1395      }
1396
1397      if (stats == null || stats.length != cluster.numServers) {
1398        stats = new double[cluster.numServers];
1399      }
1400
1401      for (int i =0; i < stats.length; i++) {
1402        //Cost this server has from RegionLoad
1403        long cost = 0;
1404
1405        // for every region on this server get the rl
1406        for(int regionIndex:cluster.regionsPerServer[i]) {
1407          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1408
1409          // Now if we found a region load get the type of cost that was requested.
1410          if (regionLoadList != null) {
1411            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1412          }
1413        }
1414
1415        // Add the total cost to the stats.
1416        stats[i] = cost;
1417      }
1418
1419      // Now return the scaled cost from data held in the stats object.
1420      return costFromArray(stats);
1421    }
1422
1423    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1424      double cost = 0;
1425      for (BalancerRegionLoad rl : regionLoadList) {
1426        cost += getCostFromRl(rl);
1427      }
1428      return cost / regionLoadList.size();
1429    }
1430
1431    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1432  }
1433
1434  /**
1435   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1436   * We do not compare about the actual rate in requests per second but rather the rate relative
1437   * to the rest of the regions.
1438   */
1439  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1440
1441    CostFromRegionLoadAsRateFunction(Configuration conf) {
1442      super(conf);
1443    }
1444
1445    @Override
1446    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1447      double cost = 0;
1448      double previous = 0;
1449      boolean isFirst = true;
1450      for (BalancerRegionLoad rl : regionLoadList) {
1451        double current = getCostFromRl(rl);
1452        if (isFirst) {
1453          isFirst = false;
1454        } else {
1455          cost += current - previous;
1456        }
1457        previous = current;
1458      }
1459      return Math.max(0, cost / (regionLoadList.size() - 1));
1460    }
1461  }
1462
1463  /**
1464   * Compute the cost of total number of read requests  The more unbalanced the higher the
1465   * computed cost will be.  This uses a rolling average of regionload.
1466   */
1467
1468  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1469
1470    private static final String READ_REQUEST_COST_KEY =
1471        "hbase.master.balancer.stochastic.readRequestCost";
1472    private static final float DEFAULT_READ_REQUEST_COST = 5;
1473
1474    ReadRequestCostFunction(Configuration conf) {
1475      super(conf);
1476      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1477    }
1478
1479    @Override
1480    protected double getCostFromRl(BalancerRegionLoad rl) {
1481      return rl.getReadRequestsCount();
1482    }
1483  }
1484
1485  /**
1486   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1487   * computed cost will be.  This uses a rolling average of regionload.
1488   */
1489
1490  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1491
1492    private static final String CP_REQUEST_COST_KEY =
1493        "hbase.master.balancer.stochastic.cpRequestCost";
1494    private static final float DEFAULT_CP_REQUEST_COST = 5;
1495
1496    CPRequestCostFunction(Configuration conf) {
1497      super(conf);
1498      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1499    }
1500
1501    @Override
1502    protected double getCostFromRl(BalancerRegionLoad rl) {
1503      return rl.getCpRequestsCount();
1504    }
1505  }
1506
1507  /**
1508   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1509   * computed cost will be.  This uses a rolling average of regionload.
1510   */
1511  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1512
1513    private static final String WRITE_REQUEST_COST_KEY =
1514        "hbase.master.balancer.stochastic.writeRequestCost";
1515    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1516
1517    WriteRequestCostFunction(Configuration conf) {
1518      super(conf);
1519      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1520    }
1521
1522    @Override
1523    protected double getCostFromRl(BalancerRegionLoad rl) {
1524      return rl.getWriteRequestsCount();
1525    }
1526  }
1527
1528  /**
1529   * A cost function for region replicas. We give a very high cost to hosting
1530   * replicas of the same region in the same host. We do not prevent the case
1531   * though, since if numReplicas > numRegionServers, we still want to keep the
1532   * replica open.
1533   */
1534  static class RegionReplicaHostCostFunction extends CostFunction {
1535    private static final String REGION_REPLICA_HOST_COST_KEY =
1536        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1537    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1538
1539    long maxCost = 0;
1540    long[] costsPerGroup; // group is either server, host or rack
1541    int[][] primariesOfRegionsPerGroup;
1542
1543    public RegionReplicaHostCostFunction(Configuration conf) {
1544      super(conf);
1545      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1546        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1547    }
1548
1549    @Override
1550    void init(Cluster cluster) {
1551      super.init(cluster);
1552      // max cost is the case where every region replica is hosted together regardless of host
1553      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1554      costsPerGroup = new long[cluster.numHosts];
1555      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1556          ? cluster.primariesOfRegionsPerHost
1557          : cluster.primariesOfRegionsPerServer;
1558      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1559        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1560      }
1561    }
1562
1563    long getMaxCost(Cluster cluster) {
1564      if (!cluster.hasRegionReplicas) {
1565        return 0; // short circuit
1566      }
1567      // max cost is the case where every region replica is hosted together regardless of host
1568      int[] primariesOfRegions = new int[cluster.numRegions];
1569      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1570          cluster.regions.length);
1571
1572      Arrays.sort(primariesOfRegions);
1573
1574      // compute numReplicas from the sorted array
1575      return costPerGroup(primariesOfRegions);
1576    }
1577
1578    @Override
1579    boolean isNeeded() {
1580      return cluster.hasRegionReplicas;
1581    }
1582
1583    @Override
1584    double cost() {
1585      if (maxCost <= 0) {
1586        return 0;
1587      }
1588
1589      long totalCost = 0;
1590      for (int i = 0 ; i < costsPerGroup.length; i++) {
1591        totalCost += costsPerGroup[i];
1592      }
1593      return scale(0, maxCost, totalCost);
1594    }
1595
1596    /**
1597     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1598     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1599     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1600     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1601     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1602     * @return a sum of numReplicas-1 squared for each primary region in the group.
1603     */
1604    protected long costPerGroup(int[] primariesOfRegions) {
1605      long cost = 0;
1606      int currentPrimary = -1;
1607      int currentPrimaryIndex = -1;
1608      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1609      // sharing the same primary will have consecutive numbers in the array.
1610      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1611        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1612        if (primary != currentPrimary) { // we see a new primary
1613          int numReplicas = j - currentPrimaryIndex;
1614          // square the cost
1615          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1616            cost += (numReplicas - 1) * (numReplicas - 1);
1617          }
1618          currentPrimary = primary;
1619          currentPrimaryIndex = j;
1620        }
1621      }
1622
1623      return cost;
1624    }
1625
1626    @Override
1627    protected void regionMoved(int region, int oldServer, int newServer) {
1628      if (maxCost <= 0) {
1629        return; // no need to compute
1630      }
1631      if (cluster.multiServersPerHost) {
1632        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1633        int newHost = cluster.serverIndexToHostIndex[newServer];
1634        if (newHost != oldHost) {
1635          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1636          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1637        }
1638      } else {
1639        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1640        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1641      }
1642    }
1643  }
1644
1645  /**
1646   * A cost function for region replicas for the rack distribution. We give a relatively high
1647   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1648   * though.
1649   */
1650  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1651    private static final String REGION_REPLICA_RACK_COST_KEY =
1652        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1653    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1654
1655    public RegionReplicaRackCostFunction(Configuration conf) {
1656      super(conf);
1657      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1658        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1659    }
1660
1661    @Override
1662    void init(Cluster cluster) {
1663      this.cluster = cluster;
1664      if (cluster.numRacks <= 1) {
1665        maxCost = 0;
1666        return; // disabled for 1 rack
1667      }
1668      // max cost is the case where every region replica is hosted together regardless of rack
1669      maxCost = getMaxCost(cluster);
1670      costsPerGroup = new long[cluster.numRacks];
1671      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1672        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1673      }
1674    }
1675
1676    @Override
1677    protected void regionMoved(int region, int oldServer, int newServer) {
1678      if (maxCost <= 0) {
1679        return; // no need to compute
1680      }
1681      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1682      int newRack = cluster.serverIndexToRackIndex[newServer];
1683      if (newRack != oldRack) {
1684        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1685        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1686      }
1687    }
1688  }
1689
1690  /**
1691   * Compute the cost of total memstore size.  The more unbalanced the higher the
1692   * computed cost will be.  This uses a rolling average of regionload.
1693   */
1694  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1695
1696    private static final String MEMSTORE_SIZE_COST_KEY =
1697        "hbase.master.balancer.stochastic.memstoreSizeCost";
1698    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1699
1700    MemStoreSizeCostFunction(Configuration conf) {
1701      super(conf);
1702      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1703    }
1704
1705    @Override
1706    protected double getCostFromRl(BalancerRegionLoad rl) {
1707      return rl.getMemStoreSizeMB();
1708    }
1709  }
1710
1711  /**
1712   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1713   * computed cost will be.  This uses a rolling average of regionload.
1714   */
1715  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1716
1717    private static final String STOREFILE_SIZE_COST_KEY =
1718        "hbase.master.balancer.stochastic.storefileSizeCost";
1719    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1720
1721    StoreFileCostFunction(Configuration conf) {
1722      super(conf);
1723      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1724    }
1725
1726    @Override
1727    protected double getCostFromRl(BalancerRegionLoad rl) {
1728      return rl.getStorefileSizeMB();
1729    }
1730  }
1731
1732  /**
1733   * A helper function to compose the attribute name from tablename and costfunction name
1734   */
1735  public static String composeAttributeName(String tableName, String costFunctionName) {
1736    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1737  }
1738}