001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Deque;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Random;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.BalancerDecision;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
049import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
051import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.ReflectionUtils;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060/**
061 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
062 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
063 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
064 * <ul>
065 * <li>Region Load</li>
066 * <li>Table Load</li>
067 * <li>Data Locality</li>
068 * <li>Memstore Sizes</li>
069 * <li>Storefile Sizes</li>
070 * </ul>
071 *
072 *
073 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
074 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
075 * scaled by their respective multipliers:</p>
076 *
077 * <ul>
078 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
079 *   <li>hbase.master.balancer.stochastic.moveCost</li>
080 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
081 *   <li>hbase.master.balancer.stochastic.localityCost</li>
082 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
083 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
084 * </ul>
085 *
086 * <p>You can also add custom Cost function by setting the the following configuration value:</p>
087 * <ul>
088 *     <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
089 * </ul>
090 *
091 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
092 *
093 * <p>In addition to the above configurations, the balancer can be tuned by the following
094 * configuration values:</p>
095 * <ul>
096 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
097 *   controls what the max number of regions that can be moved in a single invocation of this
098 *   balancer.</li>
099 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
100 *   regions is multiplied to try and get the number of times the balancer will
101 *   mutate all servers.</li>
102 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
103 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
104 *   value and the above computation.</li>
105 * </ul>
106 *
107 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
108 * so that the balancer gets the full picture of all loads on the cluster.</p>
109 */
110@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
111@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
112  justification="Complaint is about costFunctions not being synchronized; not end of the world")
113public class StochasticLoadBalancer extends BaseLoadBalancer {
114
115  protected static final String STEPS_PER_REGION_KEY =
116      "hbase.master.balancer.stochastic.stepsPerRegion";
117  protected static final String MAX_STEPS_KEY =
118      "hbase.master.balancer.stochastic.maxSteps";
119  protected static final String RUN_MAX_STEPS_KEY =
120      "hbase.master.balancer.stochastic.runMaxSteps";
121  protected static final String MAX_RUNNING_TIME_KEY =
122      "hbase.master.balancer.stochastic.maxRunningTime";
123  protected static final String KEEP_REGION_LOADS =
124      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
125  private static final String TABLE_FUNCTION_SEP = "_";
126  protected static final String MIN_COST_NEED_BALANCE_KEY =
127      "hbase.master.balancer.stochastic.minCostNeedBalance";
128  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
129          "hbase.master.balancer.stochastic.additionalCostFunctions";
130
131  protected static final Random RANDOM = new Random(System.currentTimeMillis());
132  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
133
134  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
135
136  // values are defaults
137  private int maxSteps = 1000000;
138  private boolean runMaxSteps = false;
139  private int stepsPerRegion = 800;
140  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
141  private int numRegionLoadsToRemember = 15;
142  private float minCostNeedBalance = 0.05f;
143
144  private List<CandidateGenerator> candidateGenerators;
145  private CostFromRegionLoadFunction[] regionLoadFunctions;
146  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
147
148  // to save and report costs to JMX
149  private Double curOverallCost = 0d;
150  private Double[] tempFunctionCosts;
151  private Double[] curFunctionCosts;
152
153  // Keep locality based picker and cost function to alert them
154  // when new services are offered
155  private LocalityBasedCandidateGenerator localityCandidateGenerator;
156  private ServerLocalityCostFunction localityCost;
157  private RackLocalityCostFunction rackLocalityCost;
158  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
159  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
160
161  /**
162   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
163   * default MetricsBalancer
164   */
165  public StochasticLoadBalancer() {
166    super(new MetricsStochasticBalancer());
167  }
168
169  @Override
170  public void onConfigurationChange(Configuration conf) {
171    setConf(conf);
172  }
173
174  @Override
175  public synchronized void setConf(Configuration conf) {
176    super.setConf(conf);
177    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
178    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
179    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
180    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
181
182    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
183    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
184    if (localityCandidateGenerator == null) {
185      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
186    }
187    localityCost = new ServerLocalityCostFunction(conf, services);
188    rackLocalityCost = new RackLocalityCostFunction(conf, services);
189
190    if (this.candidateGenerators == null) {
191      candidateGenerators = Lists.newArrayList();
192      candidateGenerators.add(new RandomCandidateGenerator());
193      candidateGenerators.add(new LoadCandidateGenerator());
194      candidateGenerators.add(localityCandidateGenerator);
195      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
196    }
197    regionLoadFunctions = new CostFromRegionLoadFunction[] {
198      new ReadRequestCostFunction(conf),
199      new WriteRequestCostFunction(conf),
200      new MemStoreSizeCostFunction(conf),
201      new StoreFileCostFunction(conf)
202    };
203    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
204    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
205
206    costFunctions = new ArrayList<>();
207    addCostFunction(new RegionCountSkewCostFunction(conf));
208    addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
209    addCostFunction(new MoveCostFunction(conf));
210    addCostFunction(localityCost);
211    addCostFunction(rackLocalityCost);
212    addCostFunction(new TableSkewCostFunction(conf));
213    addCostFunction(regionReplicaHostCostFunction);
214    addCostFunction(regionReplicaRackCostFunction);
215    addCostFunction(regionLoadFunctions[0]);
216    addCostFunction(regionLoadFunctions[1]);
217    addCostFunction(regionLoadFunctions[2]);
218    addCostFunction(regionLoadFunctions[3]);
219    loadCustomCostFunctions(conf);
220
221    curFunctionCosts = new Double[costFunctions.size()];
222    tempFunctionCosts = new Double[costFunctions.size()];
223
224    boolean isBalancerDecisionRecording = getConf()
225      .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
226        BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
227    if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
228      this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
229    }
230
231    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
232            ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
233            Arrays.toString(getCostFunctionNames()) + " etc.");
234  }
235
236  private void loadCustomCostFunctions(Configuration conf) {
237    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
238
239    if (null == functionsNames) {
240      return;
241    }
242
243    costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
244      Class<? extends CostFunction> klass = null;
245      try {
246        klass = (Class<? extends CostFunction>) Class.forName(c);
247      } catch (ClassNotFoundException e) {
248        LOG.warn("Cannot load class " + c + "': " + e.getMessage());
249      }
250      if (null == klass) {
251        return null;
252      }
253      CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
254      LOG.info(
255        "Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
256      return reflected;
257    }).filter(Objects::nonNull).collect(Collectors.toList()));
258  }
259
260  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
261    this.candidateGenerators = customCandidateGenerators;
262  }
263
264  @Override
265  protected void setSlop(Configuration conf) {
266    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
267  }
268
269  @Override
270  public synchronized void setClusterMetrics(ClusterMetrics st) {
271    super.setClusterMetrics(st);
272    updateRegionLoad();
273    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
274      cost.setClusterMetrics(st);
275    }
276
277    // update metrics size
278    try {
279      // by-table or ensemble mode
280      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
281      int functionsCount = getCostFunctionNames().length;
282
283      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
284    } catch (Exception e) {
285      LOG.error("failed to get the size of all tables", e);
286    }
287  }
288
289  /**
290   * Update the number of metrics that are reported to JMX
291   */
292  public void updateMetricsSize(int size) {
293    if (metricsBalancer instanceof MetricsStochasticBalancer) {
294        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
295    }
296  }
297
298  @Override
299  public synchronized void setMasterServices(MasterServices masterServices) {
300    super.setMasterServices(masterServices);
301    this.localityCost.setServices(masterServices);
302    this.rackLocalityCost.setServices(masterServices);
303    this.localityCandidateGenerator.setServices(masterServices);
304  }
305
306  @Override
307  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
308    regionReplicaHostCostFunction.init(c);
309    if (regionReplicaHostCostFunction.cost() > 0) return true;
310    regionReplicaRackCostFunction.init(c);
311    if (regionReplicaRackCostFunction.cost() > 0) return true;
312    return false;
313  }
314
315  @Override
316  protected boolean needsBalance(TableName tableName, Cluster cluster) {
317    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
318    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
319      if (LOG.isDebugEnabled()) {
320        LOG.debug("Not running balancer because only " + cs.getNumServers()
321            + " active regionserver(s)");
322      }
323      return false;
324    }
325    if (areSomeRegionReplicasColocated(cluster)) {
326      return true;
327    }
328
329    if (idleRegionServerExist(cluster)){
330      return true;
331    }
332
333    double total = 0.0;
334    float sumMultiplier = 0.0f;
335    for (CostFunction c : costFunctions) {
336      float multiplier = c.getMultiplier();
337      if (multiplier <= 0) {
338        LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
339        continue;
340      }
341      if (!c.isNeeded()) {
342        LOG.trace("{} not needed", c.getClass().getSimpleName());
343        continue;
344      }
345      sumMultiplier += multiplier;
346      total += c.cost() * multiplier;
347    }
348
349    boolean balanced = total <= 0 || sumMultiplier <= 0 ||
350        (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
351    if (LOG.isDebugEnabled()) {
352      LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
353          balanced ? "Skipping load balancing because balanced" : "We need to load balance",
354          isByTable ? String.format("table (%s)", tableName) : "cluster",
355          total, sumMultiplier, minCostNeedBalance);
356      if (LOG.isTraceEnabled()) {
357        LOG.trace("Balance decision detailed function costs={}", functionCost());
358      }
359    }
360    return !balanced;
361  }
362
363  @InterfaceAudience.Private
364  Cluster.Action nextAction(Cluster cluster) {
365    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
366            .generate(cluster);
367  }
368
369  /**
370   * Given the cluster state this will try and approach an optimal balance. This
371   * should always approach the optimal state given enough steps.
372   */
373  @Override
374  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
375    List<RegionInfo>> loadOfOneTable) {
376    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
377    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
378      return plans;
379    }
380
381    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
382      if (loadOfOneTable.size() <= 2) {
383        return null;
384      }
385      loadOfOneTable = new HashMap<>(loadOfOneTable);
386      loadOfOneTable.remove(masterServerName);
387    }
388
389    // On clusters with lots of HFileLinks or lots of reference files,
390    // instantiating the storefile infos can be quite expensive.
391    // Allow turning this feature off if the locality cost is not going to
392    // be used in any computations.
393    RegionLocationFinder finder = null;
394    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
395        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
396      finder = this.regionFinder;
397    }
398
399    //The clusterState that is given to this method contains the state
400    //of all the regions in the table(s) (that's true today)
401    // Keep track of servers to iterate through them.
402    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
403
404    long startTime = EnvironmentEdgeManager.currentTime();
405
406    initCosts(cluster);
407
408    if (!needsBalance(tableName, cluster)) {
409      return null;
410    }
411
412    double currentCost = computeCost(cluster, Double.MAX_VALUE);
413    curOverallCost = currentCost;
414    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
415    double initCost = currentCost;
416    double newCost;
417
418    long computedMaxSteps;
419    if (runMaxSteps) {
420      computedMaxSteps = Math.max(this.maxSteps,
421          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
422    } else {
423      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
424          (long)cluster.numServers;
425      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
426      if (calculatedMaxSteps > maxSteps) {
427        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
428            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
429            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
430            + "(This config change does not require service restart)", calculatedMaxSteps,
431            maxSteps);
432      }
433    }
434    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
435        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
436
437    final String initFunctionTotalCosts = totalCostsPerFunc();
438    // Perform a stochastic walk to see if we can get a good fit.
439    long step;
440
441    for (step = 0; step < computedMaxSteps; step++) {
442      Cluster.Action action = nextAction(cluster);
443
444      if (action.type == Type.NULL) {
445        continue;
446      }
447
448      cluster.doAction(action);
449      updateCostsWithAction(cluster, action);
450
451      newCost = computeCost(cluster, currentCost);
452
453      // Should this be kept?
454      if (newCost < currentCost) {
455        currentCost = newCost;
456
457        // save for JMX
458        curOverallCost = currentCost;
459        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
460      } else {
461        // Put things back the way they were before.
462        // TODO: undo by remembering old values
463        Action undoAction = action.undoAction();
464        cluster.doAction(undoAction);
465        updateCostsWithAction(cluster, undoAction);
466      }
467
468      if (EnvironmentEdgeManager.currentTime() - startTime >
469          maxRunningTime) {
470        break;
471      }
472    }
473    long endTime = EnvironmentEdgeManager.currentTime();
474
475    metricsBalancer.balanceCluster(endTime - startTime);
476
477    // update costs metrics
478    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
479    if (initCost > currentCost) {
480      plans = createRegionPlans(cluster);
481      LOG.info("Finished computing new load balance plan. Computation took {}" +
482        " to try {} different iterations.  Found a solution that moves " +
483        "{} regions; Going from a computed cost of {}" +
484        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
485        step, plans.size(), initCost, currentCost);
486      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
487      return plans;
488    }
489    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
490      "{}, and did not find anything with a computed cost less than {}", step,
491      java.time.Duration.ofMillis(endTime - startTime), initCost);
492    return null;
493  }
494
495  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
496      double initCost, String initFunctionTotalCosts, long step) {
497    if (this.namedQueueRecorder != null) {
498      List<String> regionPlans = new ArrayList<>();
499      for (RegionPlan plan : plans) {
500        regionPlans.add(
501          "table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
502            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
503      }
504      BalancerDecision balancerDecision =
505        new BalancerDecision.Builder()
506          .setInitTotalCost(initCost)
507          .setInitialFunctionCosts(initFunctionTotalCosts)
508          .setComputedTotalCost(currentCost)
509          .setFinalFunctionCosts(totalCostsPerFunc())
510          .setComputedSteps(step)
511          .setRegionPlans(regionPlans).build();
512      namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
513    }
514  }
515
516  /**
517   * update costs to JMX
518   */
519  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
520    if (tableName == null) return;
521
522    // check if the metricsBalancer is MetricsStochasticBalancer before casting
523    if (metricsBalancer instanceof MetricsStochasticBalancer) {
524      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
525      // overall cost
526      balancer.updateStochasticCost(tableName.getNameAsString(),
527        "Overall", "Overall cost", overall);
528
529      // each cost function
530      for (int i = 0; i < costFunctions.size(); i++) {
531        CostFunction costFunction = costFunctions.get(i);
532        String costFunctionName = costFunction.getClass().getSimpleName();
533        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
534        // TODO: cost function may need a specific description
535        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
536          "The percent of " + costFunctionName, costPercent);
537      }
538    }
539  }
540
541  private void addCostFunction(CostFunction costFunction) {
542    if (costFunction.getMultiplier() > 0) {
543      costFunctions.add(costFunction);
544    }
545  }
546
547  private String functionCost() {
548    StringBuilder builder = new StringBuilder();
549    for (CostFunction c:costFunctions) {
550      builder.append(c.getClass().getSimpleName());
551      builder.append(" : (");
552      builder.append(c.getMultiplier());
553      builder.append(", ");
554      builder.append(c.cost());
555      builder.append("); ");
556    }
557    return builder.toString();
558  }
559
560  private String totalCostsPerFunc() {
561    StringBuilder builder = new StringBuilder();
562    for (CostFunction c : costFunctions) {
563      if (c.getMultiplier() * c.cost() > 0.0) {
564        builder.append(" ");
565        builder.append(c.getClass().getSimpleName());
566        builder.append(" : ");
567        builder.append(c.getMultiplier() * c.cost());
568        builder.append(";");
569      }
570    }
571    if (builder.length() > 0) {
572      builder.deleteCharAt(builder.length() - 1);
573    }
574    return builder.toString();
575  }
576
577  /**
578   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
579   * state.
580   *
581   * @param cluster The state of the cluster
582   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
583   */
584  private List<RegionPlan> createRegionPlans(Cluster cluster) {
585    List<RegionPlan> plans = new LinkedList<>();
586    for (int regionIndex = 0;
587         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
588      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
589      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
590
591      if (initialServerIndex != newServerIndex) {
592        RegionInfo region = cluster.regions[regionIndex];
593        ServerName initialServer = cluster.servers[initialServerIndex];
594        ServerName newServer = cluster.servers[newServerIndex];
595
596        if (LOG.isTraceEnabled()) {
597          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
598              + initialServer.getHostname() + " to " + newServer.getHostname());
599        }
600        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
601        plans.add(rp);
602      }
603    }
604    return plans;
605  }
606
607  /**
608   * Store the current region loads.
609   */
610  private synchronized void updateRegionLoad() {
611    // We create a new hashmap so that regions that are no longer there are removed.
612    // However we temporarily need the old loads so we can use them to keep the rolling average.
613    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
614    loads = new HashMap<>();
615
616    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
617      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
618        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
619        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
620        if (rLoads == null) {
621          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
622        } else if (rLoads.size() >= numRegionLoadsToRemember) {
623          rLoads.remove();
624        }
625        rLoads.add(new BalancerRegionLoad(rm));
626        loads.put(regionNameAsString, rLoads);
627      });
628    });
629
630    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
631      cost.setLoads(loads);
632    }
633  }
634
635  protected void initCosts(Cluster cluster) {
636    for (CostFunction c:costFunctions) {
637      c.init(cluster);
638    }
639  }
640
641  protected void updateCostsWithAction(Cluster cluster, Action action) {
642    for (CostFunction c : costFunctions) {
643      c.postAction(action);
644    }
645  }
646
647  /**
648   * Get the names of the cost functions
649   */
650  public String[] getCostFunctionNames() {
651    if (costFunctions == null) return null;
652    String[] ret = new String[costFunctions.size()];
653    for (int i = 0; i < costFunctions.size(); i++) {
654      CostFunction c = costFunctions.get(i);
655      ret[i] = c.getClass().getSimpleName();
656    }
657
658    return ret;
659  }
660
661  /**
662   * This is the main cost function.  It will compute a cost associated with a proposed cluster
663   * state.  All different costs will be combined with their multipliers to produce a double cost.
664   *
665   * @param cluster The state of the cluster
666   * @param previousCost the previous cost. This is used as an early out.
667   * @return a double of a cost associated with the proposed cluster state.  This cost is an
668   *         aggregate of all individual cost functions.
669   */
670  protected double computeCost(Cluster cluster, double previousCost) {
671    double total = 0;
672
673    for (int i = 0; i < costFunctions.size(); i++) {
674      CostFunction c = costFunctions.get(i);
675      this.tempFunctionCosts[i] = 0.0;
676
677      if (c.getMultiplier() <= 0) {
678        continue;
679      }
680
681      Float multiplier = c.getMultiplier();
682      Double cost = c.cost();
683
684      this.tempFunctionCosts[i] = multiplier*cost;
685      total += this.tempFunctionCosts[i];
686
687      if (total > previousCost) {
688        break;
689      }
690    }
691
692    return total;
693  }
694
695  static class RandomCandidateGenerator extends CandidateGenerator {
696
697    @Override
698    Cluster.Action generate(Cluster cluster) {
699
700      int thisServer = pickRandomServer(cluster);
701
702      // Pick the other server
703      int otherServer = pickOtherRandomServer(cluster, thisServer);
704
705      return pickRandomRegions(cluster, thisServer, otherServer);
706    }
707  }
708
709  /**
710   * Generates candidates which moves the replicas out of the rack for
711   * co-hosted region replicas in the same rack
712   */
713  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
714    @Override
715    Cluster.Action generate(Cluster cluster) {
716      int rackIndex = pickRandomRack(cluster);
717      if (cluster.numRacks <= 1 || rackIndex == -1) {
718        return super.generate(cluster);
719      }
720
721      int regionIndex = selectCoHostedRegionPerGroup(
722        cluster.primariesOfRegionsPerRack[rackIndex],
723        cluster.regionsPerRack[rackIndex],
724        cluster.regionIndexToPrimaryIndex);
725
726      // if there are no pairs of region replicas co-hosted, default to random generator
727      if (regionIndex == -1) {
728        // default to randompicker
729        return randomGenerator.generate(cluster);
730      }
731
732      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
733      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
734
735      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
736      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
737      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
738      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
739    }
740  }
741
742  /**
743   * Base class of StochasticLoadBalancer's Cost Functions.
744   */
745  public abstract static class CostFunction {
746
747    private float multiplier = 0;
748
749    protected Cluster cluster;
750
751    public CostFunction(Configuration c) {
752    }
753
754    boolean isNeeded() {
755      return true;
756    }
757    float getMultiplier() {
758      return multiplier;
759    }
760
761    void setMultiplier(float m) {
762      this.multiplier = m;
763    }
764
765    /** Called once per LB invocation to give the cost function
766     * to initialize it's state, and perform any costly calculation.
767     */
768    void init(Cluster cluster) {
769      this.cluster = cluster;
770    }
771
772    /** Called once per cluster Action to give the cost function
773     * an opportunity to update it's state. postAction() is always
774     * called at least once before cost() is called with the cluster
775     * that this action is performed on. */
776    void postAction(Action action) {
777      switch (action.type) {
778      case NULL: break;
779      case ASSIGN_REGION:
780        AssignRegionAction ar = (AssignRegionAction) action;
781        regionMoved(ar.region, -1, ar.server);
782        break;
783      case MOVE_REGION:
784        MoveRegionAction mra = (MoveRegionAction) action;
785        regionMoved(mra.region, mra.fromServer, mra.toServer);
786        break;
787      case SWAP_REGIONS:
788        SwapRegionsAction a = (SwapRegionsAction) action;
789        regionMoved(a.fromRegion, a.fromServer, a.toServer);
790        regionMoved(a.toRegion, a.toServer, a.fromServer);
791        break;
792      default:
793        throw new RuntimeException("Uknown action:" + action.type);
794      }
795    }
796
797    protected void regionMoved(int region, int oldServer, int newServer) {
798    }
799
800    protected abstract double cost();
801
802    @SuppressWarnings("checkstyle:linelength")
803    /**
804     * Function to compute a scaled cost using
805     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
806     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
807     * possible is all of the elements in one region server and the rest having 0.
808     *
809     * @param stats the costs
810     * @return a scaled set of costs.
811     */
812    protected double costFromArray(double[] stats) {
813      double totalCost = 0;
814      double total = getSum(stats);
815
816      double count = stats.length;
817      double mean = total/count;
818
819      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
820      // a zero sum cost for this to make sense.
821      double max = ((count - 1) * mean) + (total - mean);
822
823      // It's possible that there aren't enough regions to go around
824      double min;
825      if (count > total) {
826        min = ((count - total) * mean) + ((1 - mean) * total);
827      } else {
828        // Some will have 1 more than everything else.
829        int numHigh = (int) (total - (Math.floor(mean) * count));
830        int numLow = (int) (count - numHigh);
831
832        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
833
834      }
835      min = Math.max(0, min);
836      for (int i=0; i<stats.length; i++) {
837        double n = stats[i];
838        double diff = Math.abs(mean - n);
839        totalCost += diff;
840      }
841
842      double scaled =  scale(min, max, totalCost);
843      return scaled;
844    }
845
846    private double getSum(double[] stats) {
847      double total = 0;
848      for(double s:stats) {
849        total += s;
850      }
851      return total;
852    }
853
854    /**
855     * Scale the value between 0 and 1.
856     *
857     * @param min   Min value
858     * @param max   The Max value
859     * @param value The value to be scaled.
860     * @return The scaled value.
861     */
862    protected double scale(double min, double max, double value) {
863      if (max <= min || value <= min) {
864        return 0;
865      }
866      if ((max - min) == 0) return 0;
867
868      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
869    }
870  }
871
872  /**
873   * Given the starting state of the regions and a potential ending state
874   * compute cost based upon the number of regions that have moved.
875   */
876  static class MoveCostFunction extends CostFunction {
877    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
878    private static final String MOVE_COST_OFFPEAK_KEY =
879      "hbase.master.balancer.stochastic.moveCost.offpeak";
880    private static final String MAX_MOVES_PERCENT_KEY =
881        "hbase.master.balancer.stochastic.maxMovePercent";
882    static final float DEFAULT_MOVE_COST = 7;
883    static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
884    private static final int DEFAULT_MAX_MOVES = 600;
885    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
886
887    private final float maxMovesPercent;
888    private final Configuration conf;
889
890    MoveCostFunction(Configuration conf) {
891      super(conf);
892      this.conf = conf;
893      // What percent of the number of regions a single run of the balancer can move.
894      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
895    }
896
897    @Override
898    protected double cost() {
899      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
900      // that large benefits are need to overcome the cost of a move.
901      if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
902        this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
903      } else {
904        this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
905      }
906      // Try and size the max number of Moves, but always be prepared to move some.
907      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
908          DEFAULT_MAX_MOVES);
909
910      double moveCost = cluster.numMovedRegions;
911
912      // Don't let this single balance move more than the max moves.
913      // This allows better scaling to accurately represent the actual cost of a move.
914      if (moveCost > maxMoves) {
915        return 1000000;   // return a number much greater than any of the other cost
916      }
917
918      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
919    }
920  }
921
922  /**
923   * Compute the cost of a potential cluster state from skew in number of
924   * regions on a cluster.
925   */
926  static class RegionCountSkewCostFunction extends CostFunction {
927    static final String REGION_COUNT_SKEW_COST_KEY =
928        "hbase.master.balancer.stochastic.regionCountCost";
929    static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
930
931    private double[] stats = null;
932
933    RegionCountSkewCostFunction(Configuration conf) {
934      super(conf);
935      // Load multiplier should be the greatest as it is the most general way to balance data.
936      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
937    }
938
939    @Override
940    void init(Cluster cluster) {
941      super.init(cluster);
942      LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
943          cluster.numServers, cluster.numRegions);
944      if (LOG.isTraceEnabled()) {
945        for (int i =0; i < cluster.numServers; i++) {
946          LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
947              cluster.servers[i], cluster.regionsPerServer[i].length);
948        }
949      }
950    }
951
952    @Override
953    protected double cost() {
954      if (stats == null || stats.length != cluster.numServers) {
955        stats = new double[cluster.numServers];
956      }
957      for (int i =0; i < cluster.numServers; i++) {
958        stats[i] = cluster.regionsPerServer[i].length;
959      }
960      return costFromArray(stats);
961    }
962  }
963
964  /**
965   * Compute the cost of a potential cluster state from skew in number of
966   * primary regions on a cluster.
967   */
968  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
969    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
970        "hbase.master.balancer.stochastic.primaryRegionCountCost";
971    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
972
973    private double[] stats = null;
974
975    PrimaryRegionCountSkewCostFunction(Configuration conf) {
976      super(conf);
977      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
978      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
979        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
980    }
981
982    @Override
983    boolean isNeeded() {
984      return cluster.hasRegionReplicas;
985    }
986
987    @Override
988    protected double cost() {
989      if (!cluster.hasRegionReplicas) {
990        return 0;
991      }
992      if (stats == null || stats.length != cluster.numServers) {
993        stats = new double[cluster.numServers];
994      }
995
996      for (int i = 0; i < cluster.numServers; i++) {
997        stats[i] = 0;
998        for (int regionIdx : cluster.regionsPerServer[i]) {
999          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1000            stats[i]++;
1001          }
1002        }
1003      }
1004
1005      return costFromArray(stats);
1006    }
1007  }
1008
1009  /**
1010   * Compute the cost of a potential cluster configuration based upon how evenly
1011   * distributed tables are.
1012   */
1013  static class TableSkewCostFunction extends CostFunction {
1014
1015    private static final String TABLE_SKEW_COST_KEY =
1016        "hbase.master.balancer.stochastic.tableSkewCost";
1017    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1018
1019    TableSkewCostFunction(Configuration conf) {
1020      super(conf);
1021      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1022    }
1023
1024    @Override
1025    protected double cost() {
1026      double max = cluster.numRegions;
1027      double min = ((double) cluster.numRegions) / cluster.numServers;
1028      double value = 0;
1029
1030      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1031        value += cluster.numMaxRegionsPerTable[i];
1032      }
1033
1034      return scale(min, max, value);
1035    }
1036  }
1037
1038  /**
1039   * Compute a cost of a potential cluster configuration based upon where
1040   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1041   */
1042  static abstract class LocalityBasedCostFunction extends CostFunction {
1043
1044    private final LocalityType type;
1045
1046    private double bestLocality; // best case locality across cluster weighted by local data size
1047    private double locality; // current locality across cluster weighted by local data size
1048
1049    private MasterServices services;
1050
1051    LocalityBasedCostFunction(Configuration conf,
1052                              MasterServices srv,
1053                              LocalityType type,
1054                              String localityCostKey,
1055                              float defaultLocalityCost) {
1056      super(conf);
1057      this.type = type;
1058      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1059      this.services = srv;
1060      this.locality = 0.0;
1061      this.bestLocality = 0.0;
1062    }
1063
1064    /**
1065     * Maps region to the current entity (server or rack) on which it is stored
1066     */
1067    abstract int regionIndexToEntityIndex(int region);
1068
1069    public void setServices(MasterServices srvc) {
1070      this.services = srvc;
1071    }
1072
1073    @Override
1074    void init(Cluster cluster) {
1075      super.init(cluster);
1076      locality = 0.0;
1077      bestLocality = 0.0;
1078
1079      // If no master, no computation will work, so assume 0 cost
1080      if (this.services == null) {
1081        return;
1082      }
1083
1084      for (int region = 0; region < cluster.numRegions; region++) {
1085        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1086        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1087      }
1088
1089      // We normalize locality to be a score between 0 and 1.0 representing how good it
1090      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1091      // (and the cost is 0)
1092      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1093    }
1094
1095    @Override
1096    protected void regionMoved(int region, int oldServer, int newServer) {
1097      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1098      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1099      if (this.services == null) {
1100        return;
1101      }
1102      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1103      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1104      locality += normalizedDelta;
1105    }
1106
1107    @Override
1108    protected double cost() {
1109      return 1 - locality;
1110    }
1111
1112    private int getMostLocalEntityForRegion(int region) {
1113      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1114    }
1115
1116    private double getWeightedLocality(int region, int entity) {
1117      return cluster.getOrComputeWeightedLocality(region, entity, type);
1118    }
1119
1120  }
1121
1122  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1123
1124    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1125    private static final float DEFAULT_LOCALITY_COST = 25;
1126
1127    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1128      super(
1129          conf,
1130          srv,
1131          LocalityType.SERVER,
1132          LOCALITY_COST_KEY,
1133          DEFAULT_LOCALITY_COST
1134      );
1135    }
1136
1137    @Override
1138    int regionIndexToEntityIndex(int region) {
1139      return cluster.regionIndexToServerIndex[region];
1140    }
1141  }
1142
1143  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1144
1145    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1146    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1147
1148    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1149      super(
1150          conf,
1151          services,
1152          LocalityType.RACK,
1153          RACK_LOCALITY_COST_KEY,
1154          DEFAULT_RACK_LOCALITY_COST
1155      );
1156    }
1157
1158    @Override
1159    int regionIndexToEntityIndex(int region) {
1160      return cluster.getRackForRegion(region);
1161    }
1162  }
1163
1164  /**
1165   * Base class the allows writing costs functions from rolling average of some
1166   * number from RegionLoad.
1167   */
1168  abstract static class CostFromRegionLoadFunction extends CostFunction {
1169
1170    private ClusterMetrics clusterStatus = null;
1171    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1172    private double[] stats = null;
1173    CostFromRegionLoadFunction(Configuration conf) {
1174      super(conf);
1175    }
1176
1177    void setClusterMetrics(ClusterMetrics status) {
1178      this.clusterStatus = status;
1179    }
1180
1181    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1182      this.loads = l;
1183    }
1184
1185    @Override
1186    protected double cost() {
1187      if (clusterStatus == null || loads == null) {
1188        return 0;
1189      }
1190
1191      if (stats == null || stats.length != cluster.numServers) {
1192        stats = new double[cluster.numServers];
1193      }
1194
1195      for (int i =0; i < stats.length; i++) {
1196        //Cost this server has from RegionLoad
1197        long cost = 0;
1198
1199        // for every region on this server get the rl
1200        for(int regionIndex:cluster.regionsPerServer[i]) {
1201          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1202
1203          // Now if we found a region load get the type of cost that was requested.
1204          if (regionLoadList != null) {
1205            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1206          }
1207        }
1208
1209        // Add the total cost to the stats.
1210        stats[i] = cost;
1211      }
1212
1213      // Now return the scaled cost from data held in the stats object.
1214      return costFromArray(stats);
1215    }
1216
1217    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1218      double cost = 0;
1219      for (BalancerRegionLoad rl : regionLoadList) {
1220        cost += getCostFromRl(rl);
1221      }
1222      return cost / regionLoadList.size();
1223    }
1224
1225    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1226  }
1227
1228  /**
1229   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1230   * We do not compare about the actual rate in requests per second but rather the rate relative
1231   * to the rest of the regions.
1232   */
1233  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1234
1235    CostFromRegionLoadAsRateFunction(Configuration conf) {
1236      super(conf);
1237    }
1238
1239    @Override
1240    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1241      double cost = 0;
1242      double previous = 0;
1243      boolean isFirst = true;
1244      for (BalancerRegionLoad rl : regionLoadList) {
1245        double current = getCostFromRl(rl);
1246        if (isFirst) {
1247          isFirst = false;
1248        } else {
1249          cost += current - previous;
1250        }
1251        previous = current;
1252      }
1253      return Math.max(0, cost / (regionLoadList.size() - 1));
1254    }
1255  }
1256
1257  /**
1258   * Compute the cost of total number of read requests  The more unbalanced the higher the
1259   * computed cost will be.  This uses a rolling average of regionload.
1260   */
1261
1262  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1263
1264    private static final String READ_REQUEST_COST_KEY =
1265        "hbase.master.balancer.stochastic.readRequestCost";
1266    private static final float DEFAULT_READ_REQUEST_COST = 5;
1267
1268    ReadRequestCostFunction(Configuration conf) {
1269      super(conf);
1270      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1271    }
1272
1273    @Override
1274    protected double getCostFromRl(BalancerRegionLoad rl) {
1275      return rl.getReadRequestsCount();
1276    }
1277  }
1278
1279  /**
1280   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1281   * computed cost will be.  This uses a rolling average of regionload.
1282   */
1283  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1284
1285    private static final String WRITE_REQUEST_COST_KEY =
1286        "hbase.master.balancer.stochastic.writeRequestCost";
1287    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1288
1289    WriteRequestCostFunction(Configuration conf) {
1290      super(conf);
1291      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1292    }
1293
1294    @Override
1295    protected double getCostFromRl(BalancerRegionLoad rl) {
1296      return rl.getWriteRequestsCount();
1297    }
1298  }
1299
1300  /**
1301   * A cost function for region replicas. We give a very high cost to hosting
1302   * replicas of the same region in the same host. We do not prevent the case
1303   * though, since if numReplicas > numRegionServers, we still want to keep the
1304   * replica open.
1305   */
1306  static class RegionReplicaHostCostFunction extends CostFunction {
1307    private static final String REGION_REPLICA_HOST_COST_KEY =
1308        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1309    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1310
1311    long maxCost = 0;
1312    long[] costsPerGroup; // group is either server, host or rack
1313    int[][] primariesOfRegionsPerGroup;
1314
1315    public RegionReplicaHostCostFunction(Configuration conf) {
1316      super(conf);
1317      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1318        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1319    }
1320
1321    @Override
1322    void init(Cluster cluster) {
1323      super.init(cluster);
1324      // max cost is the case where every region replica is hosted together regardless of host
1325      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1326      costsPerGroup = new long[cluster.numHosts];
1327      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1328          ? cluster.primariesOfRegionsPerHost
1329          : cluster.primariesOfRegionsPerServer;
1330      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1331        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1332      }
1333    }
1334
1335    long getMaxCost(Cluster cluster) {
1336      if (!cluster.hasRegionReplicas) {
1337        return 0; // short circuit
1338      }
1339      // max cost is the case where every region replica is hosted together regardless of host
1340      int[] primariesOfRegions = new int[cluster.numRegions];
1341      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1342          cluster.regions.length);
1343
1344      Arrays.sort(primariesOfRegions);
1345
1346      // compute numReplicas from the sorted array
1347      return costPerGroup(primariesOfRegions);
1348    }
1349
1350    @Override
1351    boolean isNeeded() {
1352      return cluster.hasRegionReplicas;
1353    }
1354
1355    @Override
1356    protected double cost() {
1357      if (maxCost <= 0) {
1358        return 0;
1359      }
1360
1361      long totalCost = 0;
1362      for (int i = 0 ; i < costsPerGroup.length; i++) {
1363        totalCost += costsPerGroup[i];
1364      }
1365      return scale(0, maxCost, totalCost);
1366    }
1367
1368    /**
1369     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1370     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1371     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1372     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1373     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1374     * @return a sum of numReplicas-1 squared for each primary region in the group.
1375     */
1376    protected long costPerGroup(int[] primariesOfRegions) {
1377      long cost = 0;
1378      int currentPrimary = -1;
1379      int currentPrimaryIndex = -1;
1380      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1381      // sharing the same primary will have consecutive numbers in the array.
1382      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1383        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1384        if (primary != currentPrimary) { // we see a new primary
1385          int numReplicas = j - currentPrimaryIndex;
1386          // square the cost
1387          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1388            cost += (numReplicas - 1) * (numReplicas - 1);
1389          }
1390          currentPrimary = primary;
1391          currentPrimaryIndex = j;
1392        }
1393      }
1394
1395      return cost;
1396    }
1397
1398    @Override
1399    protected void regionMoved(int region, int oldServer, int newServer) {
1400      if (maxCost <= 0) {
1401        return; // no need to compute
1402      }
1403      if (cluster.multiServersPerHost) {
1404        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1405        int newHost = cluster.serverIndexToHostIndex[newServer];
1406        if (newHost != oldHost) {
1407          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1408          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1409        }
1410      } else {
1411        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1412        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1413      }
1414    }
1415  }
1416
1417  /**
1418   * A cost function for region replicas for the rack distribution. We give a relatively high
1419   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1420   * though.
1421   */
1422  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1423    private static final String REGION_REPLICA_RACK_COST_KEY =
1424        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1425    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1426
1427    public RegionReplicaRackCostFunction(Configuration conf) {
1428      super(conf);
1429      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1430        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1431    }
1432
1433    @Override
1434    void init(Cluster cluster) {
1435      this.cluster = cluster;
1436      if (cluster.numRacks <= 1) {
1437        maxCost = 0;
1438        return; // disabled for 1 rack
1439      }
1440      // max cost is the case where every region replica is hosted together regardless of rack
1441      maxCost = getMaxCost(cluster);
1442      costsPerGroup = new long[cluster.numRacks];
1443      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1444        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1445      }
1446    }
1447
1448    @Override
1449    protected void regionMoved(int region, int oldServer, int newServer) {
1450      if (maxCost <= 0) {
1451        return; // no need to compute
1452      }
1453      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1454      int newRack = cluster.serverIndexToRackIndex[newServer];
1455      if (newRack != oldRack) {
1456        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1457        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1458      }
1459    }
1460  }
1461
1462  /**
1463   * Compute the cost of total memstore size.  The more unbalanced the higher the
1464   * computed cost will be.  This uses a rolling average of regionload.
1465   */
1466  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1467
1468    private static final String MEMSTORE_SIZE_COST_KEY =
1469        "hbase.master.balancer.stochastic.memstoreSizeCost";
1470    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1471
1472    MemStoreSizeCostFunction(Configuration conf) {
1473      super(conf);
1474      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1475    }
1476
1477    @Override
1478    protected double getCostFromRl(BalancerRegionLoad rl) {
1479      return rl.getMemStoreSizeMB();
1480    }
1481  }
1482  /**
1483   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1484   * computed cost will be.  This uses a rolling average of regionload.
1485   */
1486  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1487
1488    private static final String STOREFILE_SIZE_COST_KEY =
1489        "hbase.master.balancer.stochastic.storefileSizeCost";
1490    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1491
1492    StoreFileCostFunction(Configuration conf) {
1493      super(conf);
1494      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1495    }
1496
1497    @Override
1498    protected double getCostFromRl(BalancerRegionLoad rl) {
1499      return rl.getStorefileSizeMB();
1500    }
1501  }
1502
1503  /**
1504   * A helper function to compose the attribute name from tablename and costfunction name
1505   */
1506  public static String composeAttributeName(String tableName, String costFunctionName) {
1507    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1508  }
1509}