001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayDeque;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.Random;
032import java.util.stream.Collectors;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ClusterMetrics;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.RegionMetrics;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.RegionPlan;
044import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
046import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
047import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
048import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
049import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.ReflectionUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
057import org.apache.hbase.thirdparty.com.google.common.base.Optional;
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060
061/**
062 * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
063 * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
064 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
065 * <ul>
066 * <li>Region Load</li>
067 * <li>Table Load</li>
068 * <li>Data Locality</li>
069 * <li>Memstore Sizes</li>
070 * <li>Storefile Sizes</li>
071 * </ul>
072 *
073 *
074 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
075 * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
076 * scaled by their respective multipliers:</p>
077 *
078 * <ul>
079 *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
080 *   <li>hbase.master.balancer.stochastic.moveCost</li>
081 *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
082 *   <li>hbase.master.balancer.stochastic.localityCost</li>
083 *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
084 *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
085 * </ul>
086 *
087 * <p>You can also add custom Cost function by setting the the following configuration value:</p>
088 * <ul>
089 *     <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
090 * </ul>
091 *
092 * <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
093 *
094 * <p>In addition to the above configurations, the balancer can be tuned by the following
095 * configuration values:</p>
096 * <ul>
097 *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
098 *   controls what the max number of regions that can be moved in a single invocation of this
099 *   balancer.</li>
100 *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
101 *   regions is multiplied to try and get the number of times the balancer will
102 *   mutate all servers.</li>
103 *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
104 *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
105 *   value and the above computation.</li>
106 * </ul>
107 *
108 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
109 * so that the balancer gets the full picture of all loads on the cluster.</p>
110 */
111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
112@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
113  justification="Complaint is about costFunctions not being synchronized; not end of the world")
114public class StochasticLoadBalancer extends BaseLoadBalancer {
115
116  protected static final String STEPS_PER_REGION_KEY =
117      "hbase.master.balancer.stochastic.stepsPerRegion";
118  protected static final String MAX_STEPS_KEY =
119      "hbase.master.balancer.stochastic.maxSteps";
120  protected static final String RUN_MAX_STEPS_KEY =
121      "hbase.master.balancer.stochastic.runMaxSteps";
122  protected static final String MAX_RUNNING_TIME_KEY =
123      "hbase.master.balancer.stochastic.maxRunningTime";
124  protected static final String KEEP_REGION_LOADS =
125      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
126  private static final String TABLE_FUNCTION_SEP = "_";
127  protected static final String MIN_COST_NEED_BALANCE_KEY =
128      "hbase.master.balancer.stochastic.minCostNeedBalance";
129  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
130          "hbase.master.balancer.stochastic.additionalCostFunctions";
131
132  protected static final Random RANDOM = new Random(System.currentTimeMillis());
133  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
134
135  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
136
137  // values are defaults
138  private int maxSteps = 1000000;
139  private boolean runMaxSteps = false;
140  private int stepsPerRegion = 800;
141  private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
142  private int numRegionLoadsToRemember = 15;
143  private float minCostNeedBalance = 0.05f;
144
145  private List<CandidateGenerator> candidateGenerators;
146  private CostFromRegionLoadFunction[] regionLoadFunctions;
147  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
148
149  // to save and report costs to JMX
150  private Double curOverallCost = 0d;
151  private Double[] tempFunctionCosts;
152  private Double[] curFunctionCosts;
153
154  // Keep locality based picker and cost function to alert them
155  // when new services are offered
156  private LocalityBasedCandidateGenerator localityCandidateGenerator;
157  private ServerLocalityCostFunction localityCost;
158  private RackLocalityCostFunction rackLocalityCost;
159  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
160  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
161
162  /**
163   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
164   * default MetricsBalancer
165   */
166  public StochasticLoadBalancer() {
167    super(new MetricsStochasticBalancer());
168  }
169
170  @Override
171  public void onConfigurationChange(Configuration conf) {
172    setConf(conf);
173  }
174
175  @Override
176  public synchronized void setConf(Configuration conf) {
177    super.setConf(conf);
178    maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
179    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
180    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
181    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
182
183    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
184    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
185    if (localityCandidateGenerator == null) {
186      localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
187    }
188    localityCost = new ServerLocalityCostFunction(conf, services);
189    rackLocalityCost = new RackLocalityCostFunction(conf, services);
190
191    if (this.candidateGenerators == null) {
192      candidateGenerators = Lists.newArrayList();
193      candidateGenerators.add(new RandomCandidateGenerator());
194      candidateGenerators.add(new LoadCandidateGenerator());
195      candidateGenerators.add(localityCandidateGenerator);
196      candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
197    }
198    regionLoadFunctions = new CostFromRegionLoadFunction[] {
199      new ReadRequestCostFunction(conf),
200      new CPRequestCostFunction(conf),
201      new WriteRequestCostFunction(conf),
202      new MemStoreSizeCostFunction(conf),
203      new StoreFileCostFunction(conf)
204    };
205    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
206    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
207
208    costFunctions = new ArrayList<>();
209    costFunctions.add(new RegionCountSkewCostFunction(conf));
210    costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
211    costFunctions.add(new MoveCostFunction(conf));
212    costFunctions.add(localityCost);
213    costFunctions.add(rackLocalityCost);
214    costFunctions.add(new TableSkewCostFunction(conf));
215    costFunctions.add(regionReplicaHostCostFunction);
216    costFunctions.add(regionReplicaRackCostFunction);
217    costFunctions.add(regionLoadFunctions[0]);
218    costFunctions.add(regionLoadFunctions[1]);
219    costFunctions.add(regionLoadFunctions[2]);
220    costFunctions.add(regionLoadFunctions[3]);
221    costFunctions.add(regionLoadFunctions[4]);
222    loadCustomCostFunctions(conf);
223
224    curFunctionCosts= new Double[costFunctions.size()];
225    tempFunctionCosts= new Double[costFunctions.size()];
226    LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
227            ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
228            Arrays.toString(getCostFunctionNames()) + " etc.");
229  }
230
231  private void loadCustomCostFunctions(Configuration conf) {
232    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
233
234    if (null == functionsNames) {
235      return;
236    }
237
238    costFunctions.addAll(Arrays.stream(functionsNames)
239            .map(c -> {
240              Class<? extends CostFunction> klass = null;
241              try {
242                klass = (Class<? extends CostFunction>) Class.forName(c);
243              } catch (ClassNotFoundException e) {
244                LOG.warn("Cannot load class " + c + "': " + e.getMessage());
245              }
246              if (null == klass) {
247                return null;
248              }
249
250              CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
251              LOG.info("Successfully loaded custom CostFunction '" +
252                      reflected.getClass().getSimpleName() + "'");
253
254              return reflected;
255            })
256            .filter(Objects::nonNull)
257            .collect(Collectors.toList()));
258  }
259
260  protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
261    this.candidateGenerators = customCandidateGenerators;
262  }
263
264  @Override
265  protected void setSlop(Configuration conf) {
266    this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
267  }
268
269  @Override
270  public synchronized void setClusterMetrics(ClusterMetrics st) {
271    super.setClusterMetrics(st);
272    updateRegionLoad();
273    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
274      cost.setClusterMetrics(st);
275    }
276
277    // update metrics size
278    try {
279      // by-table or ensemble mode
280      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
281      int functionsCount = getCostFunctionNames().length;
282
283      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
284    } catch (Exception e) {
285      LOG.error("failed to get the size of all tables", e);
286    }
287  }
288
289  /**
290   * Update the number of metrics that are reported to JMX
291   */
292  public void updateMetricsSize(int size) {
293    if (metricsBalancer instanceof MetricsStochasticBalancer) {
294        ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
295    }
296  }
297
298  @Override
299  public synchronized void setMasterServices(MasterServices masterServices) {
300    super.setMasterServices(masterServices);
301    this.localityCost.setServices(masterServices);
302    this.rackLocalityCost.setServices(masterServices);
303    this.localityCandidateGenerator.setServices(masterServices);
304  }
305
306  @Override
307  protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
308    regionReplicaHostCostFunction.init(c);
309    if (regionReplicaHostCostFunction.cost() > 0) return true;
310    regionReplicaRackCostFunction.init(c);
311    if (regionReplicaRackCostFunction.cost() > 0) return true;
312    return false;
313  }
314
315  @Override
316  protected boolean needsBalance(TableName tableName, Cluster cluster) {
317    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
318    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
319      if (LOG.isDebugEnabled()) {
320        LOG.debug("Not running balancer because only " + cs.getNumServers()
321            + " active regionserver(s)");
322      }
323      return false;
324    }
325    if (areSomeRegionReplicasColocated(cluster)) {
326      return true;
327    }
328
329    double total = 0.0;
330    float sumMultiplier = 0.0f;
331    for (CostFunction c : costFunctions) {
332      float multiplier = c.getMultiplier();
333      if (multiplier <= 0) {
334        continue;
335      }
336      if (!c.isNeeded()) {
337        LOG.debug("{} not needed", c.getClass().getSimpleName());
338        continue;
339      }
340      sumMultiplier += multiplier;
341      total += c.cost() * multiplier;
342    }
343
344    if (total <= 0 || sumMultiplier <= 0
345        || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
346      if (LOG.isTraceEnabled()) {
347        final String loadBalanceTarget =
348            isByTable ? String.format("table (%s)", tableName) : "cluster";
349        LOG.trace("Skipping load balancing because the {} is balanced. Total cost: {}, "
350            + "Sum multiplier: {}, Minimum cost needed for balance: {}", loadBalanceTarget, total,
351            sumMultiplier, minCostNeedBalance);
352      }
353      return false;
354    }
355    return true;
356  }
357
358  @VisibleForTesting
359  Cluster.Action nextAction(Cluster cluster) {
360    return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
361            .generate(cluster);
362  }
363
364  /**
365   * Given the cluster state this will try and approach an optimal balance. This
366   * should always approach the optimal state given enough steps.
367   */
368  @Override
369  public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
370    List<RegionInfo>> loadOfOneTable) {
371    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
372    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
373      return plans;
374    }
375
376    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
377      if (loadOfOneTable.size() <= 2) {
378        return null;
379      }
380      loadOfOneTable = new HashMap<>(loadOfOneTable);
381      loadOfOneTable.remove(masterServerName);
382    }
383
384    // On clusters with lots of HFileLinks or lots of reference files,
385    // instantiating the storefile infos can be quite expensive.
386    // Allow turning this feature off if the locality cost is not going to
387    // be used in any computations.
388    RegionLocationFinder finder = null;
389    if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
390        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
391      finder = this.regionFinder;
392    }
393
394    //The clusterState that is given to this method contains the state
395    //of all the regions in the table(s) (that's true today)
396    // Keep track of servers to iterate through them.
397    Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
398
399    long startTime = EnvironmentEdgeManager.currentTime();
400
401    initCosts(cluster);
402
403    if (!needsBalance(tableName, cluster)) {
404      return null;
405    }
406
407    double currentCost = computeCost(cluster, Double.MAX_VALUE);
408    curOverallCost = currentCost;
409    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
410    double initCost = currentCost;
411    double newCost = currentCost;
412
413    long computedMaxSteps;
414    if (runMaxSteps) {
415      computedMaxSteps = Math.max(this.maxSteps,
416          ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
417    } else {
418      long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
419          (long)cluster.numServers;
420      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
421      if (calculatedMaxSteps > maxSteps) {
422        LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
423            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
424            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
425            + "(This config change does not require service restart)", calculatedMaxSteps,
426            maxSteps);
427      }
428    }
429    LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
430        + functionCost() + " computedMaxSteps: " + computedMaxSteps);
431
432    // Perform a stochastic walk to see if we can get a good fit.
433    long step;
434
435    for (step = 0; step < computedMaxSteps; step++) {
436      Cluster.Action action = nextAction(cluster);
437
438      if (action.type == Type.NULL) {
439        continue;
440      }
441
442      cluster.doAction(action);
443      updateCostsWithAction(cluster, action);
444
445      newCost = computeCost(cluster, currentCost);
446
447      // Should this be kept?
448      if (newCost < currentCost) {
449        currentCost = newCost;
450
451        // save for JMX
452        curOverallCost = currentCost;
453        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
454      } else {
455        // Put things back the way they were before.
456        // TODO: undo by remembering old values
457        Action undoAction = action.undoAction();
458        cluster.doAction(undoAction);
459        updateCostsWithAction(cluster, undoAction);
460      }
461
462      if (EnvironmentEdgeManager.currentTime() - startTime >
463          maxRunningTime) {
464        break;
465      }
466    }
467    long endTime = EnvironmentEdgeManager.currentTime();
468
469    metricsBalancer.balanceCluster(endTime - startTime);
470
471    // update costs metrics
472    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
473    if (initCost > currentCost) {
474      plans = createRegionPlans(cluster);
475      LOG.info("Finished computing new load balance plan. Computation took {}" +
476        " to try {} different iterations.  Found a solution that moves " +
477        "{} regions; Going from a computed cost of {}" +
478        " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
479        step, plans.size(), initCost, currentCost);
480      return plans;
481    }
482    LOG.info("Could not find a better load balance plan.  Tried {} different configurations in " +
483      "{}, and did not find anything with a computed cost less than {}", step,
484      java.time.Duration.ofMillis(endTime - startTime), initCost);
485    return null;
486  }
487
488  /**
489   * update costs to JMX
490   */
491  private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
492    if (tableName == null) return;
493
494    // check if the metricsBalancer is MetricsStochasticBalancer before casting
495    if (metricsBalancer instanceof MetricsStochasticBalancer) {
496      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
497      // overall cost
498      balancer.updateStochasticCost(tableName.getNameAsString(),
499        "Overall", "Overall cost", overall);
500
501      // each cost function
502      for (int i = 0; i < costFunctions.size(); i++) {
503        CostFunction costFunction = costFunctions.get(i);
504        String costFunctionName = costFunction.getClass().getSimpleName();
505        Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
506        // TODO: cost function may need a specific description
507        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
508          "The percent of " + costFunctionName, costPercent);
509      }
510    }
511  }
512
513  private String functionCost() {
514    StringBuilder builder = new StringBuilder();
515    for (CostFunction c:costFunctions) {
516      builder.append(c.getClass().getSimpleName());
517      builder.append(" : (");
518      builder.append(c.getMultiplier());
519      builder.append(", ");
520      builder.append(c.cost());
521      builder.append("); ");
522    }
523    return builder.toString();
524  }
525
526  /**
527   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
528   * state.
529   *
530   * @param cluster The state of the cluster
531   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
532   */
533  private List<RegionPlan> createRegionPlans(Cluster cluster) {
534    List<RegionPlan> plans = new LinkedList<>();
535    for (int regionIndex = 0;
536         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
537      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
538      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
539
540      if (initialServerIndex != newServerIndex) {
541        RegionInfo region = cluster.regions[regionIndex];
542        ServerName initialServer = cluster.servers[initialServerIndex];
543        ServerName newServer = cluster.servers[newServerIndex];
544
545        if (LOG.isTraceEnabled()) {
546          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
547              + initialServer.getHostname() + " to " + newServer.getHostname());
548        }
549        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
550        plans.add(rp);
551      }
552    }
553    return plans;
554  }
555
556  /**
557   * Store the current region loads.
558   */
559  private synchronized void updateRegionLoad() {
560    // We create a new hashmap so that regions that are no longer there are removed.
561    // However we temporarily need the old loads so we can use them to keep the rolling average.
562    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
563    loads = new HashMap<>();
564
565    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
566      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
567        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
568        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
569        if (rLoads == null) {
570          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
571        } else if (rLoads.size() >= numRegionLoadsToRemember) {
572          rLoads.remove();
573        }
574        rLoads.add(new BalancerRegionLoad(rm));
575        loads.put(regionNameAsString, rLoads);
576      });
577    });
578
579    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
580      cost.setLoads(loads);
581    }
582  }
583
584  protected void initCosts(Cluster cluster) {
585    for (CostFunction c:costFunctions) {
586      c.init(cluster);
587    }
588  }
589
590  protected void updateCostsWithAction(Cluster cluster, Action action) {
591    for (CostFunction c : costFunctions) {
592      c.postAction(action);
593    }
594  }
595
596  /**
597   * Get the names of the cost functions
598   */
599  public String[] getCostFunctionNames() {
600    if (costFunctions == null) return null;
601    String[] ret = new String[costFunctions.size()];
602    for (int i = 0; i < costFunctions.size(); i++) {
603      CostFunction c = costFunctions.get(i);
604      ret[i] = c.getClass().getSimpleName();
605    }
606
607    return ret;
608  }
609
610  /**
611   * This is the main cost function.  It will compute a cost associated with a proposed cluster
612   * state.  All different costs will be combined with their multipliers to produce a double cost.
613   *
614   * @param cluster The state of the cluster
615   * @param previousCost the previous cost. This is used as an early out.
616   * @return a double of a cost associated with the proposed cluster state.  This cost is an
617   *         aggregate of all individual cost functions.
618   */
619  protected double computeCost(Cluster cluster, double previousCost) {
620    double total = 0;
621
622    for (int i = 0; i < costFunctions.size(); i++) {
623      CostFunction c = costFunctions.get(i);
624      this.tempFunctionCosts[i] = 0.0;
625
626      if (c.getMultiplier() <= 0) {
627        continue;
628      }
629
630      Float multiplier = c.getMultiplier();
631      Double cost = c.cost();
632
633      this.tempFunctionCosts[i] = multiplier*cost;
634      total += this.tempFunctionCosts[i];
635
636      if (total > previousCost) {
637        break;
638      }
639    }
640
641    return total;
642  }
643
644  /** Generates a candidate action to be applied to the cluster for cost function search */
645  abstract static class CandidateGenerator {
646    abstract Cluster.Action generate(Cluster cluster);
647
648    /**
649     * From a list of regions pick a random one. Null can be returned which
650     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
651     * rather than swap.
652     *
653     * @param cluster        The state of the cluster
654     * @param server         index of the server
655     * @param chanceOfNoSwap Chance that this will decide to try a move rather
656     *                       than a swap.
657     * @return a random {@link RegionInfo} or null if an asymmetrical move is
658     *         suggested.
659     */
660    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
661      // Check to see if this is just a move.
662      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
663        // signal a move only.
664        return -1;
665      }
666      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
667      return cluster.regionsPerServer[server][rand];
668
669    }
670    protected int pickRandomServer(Cluster cluster) {
671      if (cluster.numServers < 1) {
672        return -1;
673      }
674
675      return RANDOM.nextInt(cluster.numServers);
676    }
677
678    protected int pickRandomRack(Cluster cluster) {
679      if (cluster.numRacks < 1) {
680        return -1;
681      }
682
683      return RANDOM.nextInt(cluster.numRacks);
684    }
685
686    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
687      if (cluster.numServers < 2) {
688        return -1;
689      }
690      while (true) {
691        int otherServerIndex = pickRandomServer(cluster);
692        if (otherServerIndex != serverIndex) {
693          return otherServerIndex;
694        }
695      }
696    }
697
698    protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
699      if (cluster.numRacks < 2) {
700        return -1;
701      }
702      while (true) {
703        int otherRackIndex = pickRandomRack(cluster);
704        if (otherRackIndex != rackIndex) {
705          return otherRackIndex;
706        }
707      }
708    }
709
710    protected Cluster.Action pickRandomRegions(Cluster cluster,
711                                                       int thisServer,
712                                                       int otherServer) {
713      if (thisServer < 0 || otherServer < 0) {
714        return Cluster.NullAction;
715      }
716
717      // Decide who is most likely to need another region
718      int thisRegionCount = cluster.getNumRegions(thisServer);
719      int otherRegionCount = cluster.getNumRegions(otherServer);
720
721      // Assign the chance based upon the above
722      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
723      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
724
725      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
726      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
727
728      return getAction(thisServer, thisRegion, otherServer, otherRegion);
729    }
730
731    protected Cluster.Action getAction(int fromServer, int fromRegion,
732        int toServer, int toRegion) {
733      if (fromServer < 0 || toServer < 0) {
734        return Cluster.NullAction;
735      }
736      if (fromRegion > 0 && toRegion > 0) {
737        return new Cluster.SwapRegionsAction(fromServer, fromRegion,
738          toServer, toRegion);
739      } else if (fromRegion > 0) {
740        return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
741      } else if (toRegion > 0) {
742        return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
743      } else {
744        return Cluster.NullAction;
745      }
746    }
747
748    /**
749     * Returns a random iteration order of indexes of an array with size length
750     */
751    protected List<Integer> getRandomIterationOrder(int length) {
752      ArrayList<Integer> order = new ArrayList<>(length);
753      for (int i = 0; i < length; i++) {
754        order.add(i);
755      }
756      Collections.shuffle(order);
757      return order;
758    }
759  }
760
761  static class RandomCandidateGenerator extends CandidateGenerator {
762
763    @Override
764    Cluster.Action generate(Cluster cluster) {
765
766      int thisServer = pickRandomServer(cluster);
767
768      // Pick the other server
769      int otherServer = pickOtherRandomServer(cluster, thisServer);
770
771      return pickRandomRegions(cluster, thisServer, otherServer);
772    }
773  }
774
775  static class LoadCandidateGenerator extends CandidateGenerator {
776
777    @Override
778    Cluster.Action generate(Cluster cluster) {
779      cluster.sortServersByRegionCount();
780      int thisServer = pickMostLoadedServer(cluster, -1);
781      int otherServer = pickLeastLoadedServer(cluster, thisServer);
782
783      return pickRandomRegions(cluster, thisServer, otherServer);
784    }
785
786    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
787      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
788
789      int index = 0;
790      while (servers[index] == null || servers[index] == thisServer) {
791        index++;
792        if (index == servers.length) {
793          return -1;
794        }
795      }
796      return servers[index];
797    }
798
799    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
800      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
801
802      int index = servers.length - 1;
803      while (servers[index] == null || servers[index] == thisServer) {
804        index--;
805        if (index < 0) {
806          return -1;
807        }
808      }
809      return servers[index];
810    }
811  }
812
813  static class LocalityBasedCandidateGenerator extends CandidateGenerator {
814
815    private MasterServices masterServices;
816
817    LocalityBasedCandidateGenerator(MasterServices masterServices) {
818      this.masterServices = masterServices;
819    }
820
821    @Override
822    Cluster.Action generate(Cluster cluster) {
823      if (this.masterServices == null) {
824        int thisServer = pickRandomServer(cluster);
825        // Pick the other server
826        int otherServer = pickOtherRandomServer(cluster, thisServer);
827        return pickRandomRegions(cluster, thisServer, otherServer);
828      }
829
830      // Randomly iterate through regions until you find one that is not on ideal host
831      for (int region : getRandomIterationOrder(cluster.numRegions)) {
832        int currentServer = cluster.regionIndexToServerIndex[region];
833        if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
834          Optional<Action> potential = tryMoveOrSwap(
835              cluster,
836              currentServer,
837              region,
838              cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
839          );
840          if (potential.isPresent()) {
841            return potential.get();
842          }
843        }
844      }
845      return Cluster.NullAction;
846    }
847
848    /**
849     * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
850     * Returns empty optional if no move can be found
851     */
852    private Optional<Action> tryMoveOrSwap(Cluster cluster,
853                                           int fromServer,
854                                           int fromRegion,
855                                           int toServer) {
856      // Try move first. We know apriori fromRegion has the highest locality on toServer
857      if (cluster.serverHasTooFewRegions(toServer)) {
858        return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
859      }
860
861      // Compare locality gain/loss from swapping fromRegion with regions on toServer
862      double fromRegionLocalityDelta =
863          getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
864      for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
865        int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
866        double toRegionLocalityDelta =
867            getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
868        // If locality would remain neutral or improve, attempt the swap
869        if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
870          return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
871        }
872      }
873
874      return Optional.absent();
875    }
876
877    private double getWeightedLocality(Cluster cluster, int region, int server) {
878      return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
879    }
880
881    void setServices(MasterServices services) {
882      this.masterServices = services;
883    }
884  }
885
886  /**
887   * Generates candidates which moves the replicas out of the region server for
888   * co-hosted region replicas
889   */
890  static class RegionReplicaCandidateGenerator extends CandidateGenerator {
891
892    RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
893
894    /**
895     * Randomly select one regionIndex out of all region replicas co-hosted in the same group
896     * (a group is a server, host or rack)
897     * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
898     * primariesOfRegionsPerHost or primariesOfRegionsPerRack
899     * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
900     * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
901     * @return a regionIndex for the selected primary or -1 if there is no co-locating
902     */
903    int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
904        , int[] regionIndexToPrimaryIndex) {
905      int currentPrimary = -1;
906      int currentPrimaryIndex = -1;
907      int selectedPrimaryIndex = -1;
908      double currentLargestRandom = -1;
909      // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
910      // ids for the regions hosted in server, a consecutive repetition means that replicas
911      // are co-hosted
912      for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
913        int primary = j < primariesOfRegionsPerGroup.length
914            ? primariesOfRegionsPerGroup[j] : -1;
915        if (primary != currentPrimary) { // check for whether we see a new primary
916          int numReplicas = j - currentPrimaryIndex;
917          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
918            // decide to select this primary region id or not
919            double currentRandom = RANDOM.nextDouble();
920            // we don't know how many region replicas are co-hosted, we will randomly select one
921            // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
922            if (currentRandom > currentLargestRandom) {
923              selectedPrimaryIndex = currentPrimary;
924              currentLargestRandom = currentRandom;
925            }
926          }
927          currentPrimary = primary;
928          currentPrimaryIndex = j;
929        }
930      }
931
932      // we have found the primary id for the region to move. Now find the actual regionIndex
933      // with the given primary, prefer to move the secondary region.
934      for (int j = 0; j < regionsPerGroup.length; j++) {
935        int regionIndex = regionsPerGroup[j];
936        if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
937          // always move the secondary, not the primary
938          if (selectedPrimaryIndex != regionIndex) {
939            return regionIndex;
940          }
941        }
942      }
943      return -1;
944    }
945
946    @Override
947    Cluster.Action generate(Cluster cluster) {
948      int serverIndex = pickRandomServer(cluster);
949      if (cluster.numServers <= 1 || serverIndex == -1) {
950        return Cluster.NullAction;
951      }
952
953      int regionIndex = selectCoHostedRegionPerGroup(
954        cluster.primariesOfRegionsPerServer[serverIndex],
955        cluster.regionsPerServer[serverIndex],
956        cluster.regionIndexToPrimaryIndex);
957
958      // if there are no pairs of region replicas co-hosted, default to random generator
959      if (regionIndex == -1) {
960        // default to randompicker
961        return randomGenerator.generate(cluster);
962      }
963
964      int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
965      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
966      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
967    }
968  }
969
970  /**
971   * Generates candidates which moves the replicas out of the rack for
972   * co-hosted region replicas in the same rack
973   */
974  static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
975    @Override
976    Cluster.Action generate(Cluster cluster) {
977      int rackIndex = pickRandomRack(cluster);
978      if (cluster.numRacks <= 1 || rackIndex == -1) {
979        return super.generate(cluster);
980      }
981
982      int regionIndex = selectCoHostedRegionPerGroup(
983        cluster.primariesOfRegionsPerRack[rackIndex],
984        cluster.regionsPerRack[rackIndex],
985        cluster.regionIndexToPrimaryIndex);
986
987      // if there are no pairs of region replicas co-hosted, default to random generator
988      if (regionIndex == -1) {
989        // default to randompicker
990        return randomGenerator.generate(cluster);
991      }
992
993      int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
994      int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
995
996      int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
997      int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
998      int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
999      return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
1000    }
1001  }
1002
1003  /**
1004   * Base class of StochasticLoadBalancer's Cost Functions.
1005   */
1006  public abstract static class CostFunction {
1007
1008    private float multiplier = 0;
1009
1010    protected Cluster cluster;
1011
1012    public CostFunction(Configuration c) {
1013    }
1014
1015    boolean isNeeded() {
1016      return true;
1017    }
1018    float getMultiplier() {
1019      return multiplier;
1020    }
1021
1022    void setMultiplier(float m) {
1023      this.multiplier = m;
1024    }
1025
1026    /** Called once per LB invocation to give the cost function
1027     * to initialize it's state, and perform any costly calculation.
1028     */
1029    void init(Cluster cluster) {
1030      this.cluster = cluster;
1031    }
1032
1033    /** Called once per cluster Action to give the cost function
1034     * an opportunity to update it's state. postAction() is always
1035     * called at least once before cost() is called with the cluster
1036     * that this action is performed on. */
1037    void postAction(Action action) {
1038      switch (action.type) {
1039      case NULL: break;
1040      case ASSIGN_REGION:
1041        AssignRegionAction ar = (AssignRegionAction) action;
1042        regionMoved(ar.region, -1, ar.server);
1043        break;
1044      case MOVE_REGION:
1045        MoveRegionAction mra = (MoveRegionAction) action;
1046        regionMoved(mra.region, mra.fromServer, mra.toServer);
1047        break;
1048      case SWAP_REGIONS:
1049        SwapRegionsAction a = (SwapRegionsAction) action;
1050        regionMoved(a.fromRegion, a.fromServer, a.toServer);
1051        regionMoved(a.toRegion, a.toServer, a.fromServer);
1052        break;
1053      default:
1054        throw new RuntimeException("Uknown action:" + action.type);
1055      }
1056    }
1057
1058    protected void regionMoved(int region, int oldServer, int newServer) {
1059    }
1060
1061    protected abstract double cost();
1062
1063    @SuppressWarnings("checkstyle:linelength")
1064    /**
1065     * Function to compute a scaled cost using
1066     * {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
1067     * It assumes that this is a zero sum set of costs.  It assumes that the worst case
1068     * possible is all of the elements in one region server and the rest having 0.
1069     *
1070     * @param stats the costs
1071     * @return a scaled set of costs.
1072     */
1073    protected double costFromArray(double[] stats) {
1074      double totalCost = 0;
1075      double total = getSum(stats);
1076
1077      double count = stats.length;
1078      double mean = total/count;
1079
1080      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
1081      // a zero sum cost for this to make sense.
1082      double max = ((count - 1) * mean) + (total - mean);
1083
1084      // It's possible that there aren't enough regions to go around
1085      double min;
1086      if (count > total) {
1087        min = ((count - total) * mean) + ((1 - mean) * total);
1088      } else {
1089        // Some will have 1 more than everything else.
1090        int numHigh = (int) (total - (Math.floor(mean) * count));
1091        int numLow = (int) (count - numHigh);
1092
1093        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1094
1095      }
1096      min = Math.max(0, min);
1097      for (int i=0; i<stats.length; i++) {
1098        double n = stats[i];
1099        double diff = Math.abs(mean - n);
1100        totalCost += diff;
1101      }
1102
1103      double scaled =  scale(min, max, totalCost);
1104      return scaled;
1105    }
1106
1107    private double getSum(double[] stats) {
1108      double total = 0;
1109      for(double s:stats) {
1110        total += s;
1111      }
1112      return total;
1113    }
1114
1115    /**
1116     * Scale the value between 0 and 1.
1117     *
1118     * @param min   Min value
1119     * @param max   The Max value
1120     * @param value The value to be scaled.
1121     * @return The scaled value.
1122     */
1123    protected double scale(double min, double max, double value) {
1124      if (max <= min || value <= min) {
1125        return 0;
1126      }
1127      if ((max - min) == 0) return 0;
1128
1129      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1130    }
1131  }
1132
1133  /**
1134   * Given the starting state of the regions and a potential ending state
1135   * compute cost based upon the number of regions that have moved.
1136   */
1137  static class MoveCostFunction extends CostFunction {
1138    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1139    private static final String MAX_MOVES_PERCENT_KEY =
1140        "hbase.master.balancer.stochastic.maxMovePercent";
1141    private static final float DEFAULT_MOVE_COST = 7;
1142    private static final int DEFAULT_MAX_MOVES = 600;
1143    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1144
1145    private final float maxMovesPercent;
1146
1147    MoveCostFunction(Configuration conf) {
1148      super(conf);
1149
1150      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1151      // that large benefits are need to overcome the cost of a move.
1152      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1153      // What percent of the number of regions a single run of the balancer can move.
1154      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1155    }
1156
1157    @Override
1158    protected double cost() {
1159      // Try and size the max number of Moves, but always be prepared to move some.
1160      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1161          DEFAULT_MAX_MOVES);
1162
1163      double moveCost = cluster.numMovedRegions;
1164
1165      // Don't let this single balance move more than the max moves.
1166      // This allows better scaling to accurately represent the actual cost of a move.
1167      if (moveCost > maxMoves) {
1168        return 1000000;   // return a number much greater than any of the other cost
1169      }
1170
1171      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1172    }
1173  }
1174
1175  /**
1176   * Compute the cost of a potential cluster state from skew in number of
1177   * regions on a cluster.
1178   */
1179  static class RegionCountSkewCostFunction extends CostFunction {
1180    static final String REGION_COUNT_SKEW_COST_KEY =
1181        "hbase.master.balancer.stochastic.regionCountCost";
1182    static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1183
1184    private double[] stats = null;
1185
1186    RegionCountSkewCostFunction(Configuration conf) {
1187      super(conf);
1188      // Load multiplier should be the greatest as it is the most general way to balance data.
1189      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1190    }
1191
1192    @Override
1193    protected double cost() {
1194      if (stats == null || stats.length != cluster.numServers) {
1195        stats = new double[cluster.numServers];
1196      }
1197
1198      for (int i =0; i < cluster.numServers; i++) {
1199        stats[i] = cluster.regionsPerServer[i].length;
1200      }
1201
1202      return costFromArray(stats);
1203    }
1204  }
1205
1206  /**
1207   * Compute the cost of a potential cluster state from skew in number of
1208   * primary regions on a cluster.
1209   */
1210  static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1211    private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1212        "hbase.master.balancer.stochastic.primaryRegionCountCost";
1213    private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1214
1215    private double[] stats = null;
1216
1217    PrimaryRegionCountSkewCostFunction(Configuration conf) {
1218      super(conf);
1219      // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1220      this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1221        DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1222    }
1223
1224    @Override
1225    boolean isNeeded() {
1226      return cluster.hasRegionReplicas;
1227    }
1228
1229    @Override
1230    protected double cost() {
1231      if (!cluster.hasRegionReplicas) {
1232        return 0;
1233      }
1234      if (stats == null || stats.length != cluster.numServers) {
1235        stats = new double[cluster.numServers];
1236      }
1237
1238      for (int i = 0; i < cluster.numServers; i++) {
1239        stats[i] = 0;
1240        for (int regionIdx : cluster.regionsPerServer[i]) {
1241          if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1242            stats[i]++;
1243          }
1244        }
1245      }
1246
1247      return costFromArray(stats);
1248    }
1249  }
1250
1251  /**
1252   * Compute the cost of a potential cluster configuration based upon how evenly
1253   * distributed tables are.
1254   */
1255  static class TableSkewCostFunction extends CostFunction {
1256
1257    private static final String TABLE_SKEW_COST_KEY =
1258        "hbase.master.balancer.stochastic.tableSkewCost";
1259    private static final float DEFAULT_TABLE_SKEW_COST = 35;
1260
1261    TableSkewCostFunction(Configuration conf) {
1262      super(conf);
1263      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1264    }
1265
1266    @Override
1267    protected double cost() {
1268      double max = cluster.numRegions;
1269      double min = ((double) cluster.numRegions) / cluster.numServers;
1270      double value = 0;
1271
1272      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1273        value += cluster.numMaxRegionsPerTable[i];
1274      }
1275
1276      return scale(min, max, value);
1277    }
1278  }
1279
1280  /**
1281   * Compute a cost of a potential cluster configuration based upon where
1282   * {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
1283   */
1284  static abstract class LocalityBasedCostFunction extends CostFunction {
1285
1286    private final LocalityType type;
1287
1288    private double bestLocality; // best case locality across cluster weighted by local data size
1289    private double locality; // current locality across cluster weighted by local data size
1290
1291    private MasterServices services;
1292
1293    LocalityBasedCostFunction(Configuration conf,
1294                              MasterServices srv,
1295                              LocalityType type,
1296                              String localityCostKey,
1297                              float defaultLocalityCost) {
1298      super(conf);
1299      this.type = type;
1300      this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1301      this.services = srv;
1302      this.locality = 0.0;
1303      this.bestLocality = 0.0;
1304    }
1305
1306    /**
1307     * Maps region to the current entity (server or rack) on which it is stored
1308     */
1309    abstract int regionIndexToEntityIndex(int region);
1310
1311    public void setServices(MasterServices srvc) {
1312      this.services = srvc;
1313    }
1314
1315    @Override
1316    void init(Cluster cluster) {
1317      super.init(cluster);
1318      locality = 0.0;
1319      bestLocality = 0.0;
1320
1321      // If no master, no computation will work, so assume 0 cost
1322      if (this.services == null) {
1323        return;
1324      }
1325
1326      for (int region = 0; region < cluster.numRegions; region++) {
1327        locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1328        bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1329      }
1330
1331      // We normalize locality to be a score between 0 and 1.0 representing how good it
1332      // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1333      // (and the cost is 0)
1334      locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1335    }
1336
1337    @Override
1338    protected void regionMoved(int region, int oldServer, int newServer) {
1339      int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1340      int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1341      if (this.services == null) {
1342        return;
1343      }
1344      double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1345      double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1346      locality += normalizedDelta;
1347    }
1348
1349    @Override
1350    protected double cost() {
1351      return 1 - locality;
1352    }
1353
1354    private int getMostLocalEntityForRegion(int region) {
1355      return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1356    }
1357
1358    private double getWeightedLocality(int region, int entity) {
1359      return cluster.getOrComputeWeightedLocality(region, entity, type);
1360    }
1361
1362  }
1363
1364  static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1365
1366    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1367    private static final float DEFAULT_LOCALITY_COST = 25;
1368
1369    ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1370      super(
1371          conf,
1372          srv,
1373          LocalityType.SERVER,
1374          LOCALITY_COST_KEY,
1375          DEFAULT_LOCALITY_COST
1376      );
1377    }
1378
1379    @Override
1380    int regionIndexToEntityIndex(int region) {
1381      return cluster.regionIndexToServerIndex[region];
1382    }
1383  }
1384
1385  static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1386
1387    private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1388    private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1389
1390    public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1391      super(
1392          conf,
1393          services,
1394          LocalityType.RACK,
1395          RACK_LOCALITY_COST_KEY,
1396          DEFAULT_RACK_LOCALITY_COST
1397      );
1398    }
1399
1400    @Override
1401    int regionIndexToEntityIndex(int region) {
1402      return cluster.getRackForRegion(region);
1403    }
1404  }
1405
1406  /**
1407   * Base class the allows writing costs functions from rolling average of some
1408   * number from RegionLoad.
1409   */
1410  abstract static class CostFromRegionLoadFunction extends CostFunction {
1411
1412    private ClusterMetrics clusterStatus = null;
1413    private Map<String, Deque<BalancerRegionLoad>> loads = null;
1414    private double[] stats = null;
1415    CostFromRegionLoadFunction(Configuration conf) {
1416      super(conf);
1417    }
1418
1419    void setClusterMetrics(ClusterMetrics status) {
1420      this.clusterStatus = status;
1421    }
1422
1423    void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
1424      this.loads = l;
1425    }
1426
1427    @Override
1428    protected double cost() {
1429      if (clusterStatus == null || loads == null) {
1430        return 0;
1431      }
1432
1433      if (stats == null || stats.length != cluster.numServers) {
1434        stats = new double[cluster.numServers];
1435      }
1436
1437      for (int i =0; i < stats.length; i++) {
1438        //Cost this server has from RegionLoad
1439        long cost = 0;
1440
1441        // for every region on this server get the rl
1442        for(int regionIndex:cluster.regionsPerServer[i]) {
1443          Collection<BalancerRegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1444
1445          // Now if we found a region load get the type of cost that was requested.
1446          if (regionLoadList != null) {
1447            cost = (long) (cost + getRegionLoadCost(regionLoadList));
1448          }
1449        }
1450
1451        // Add the total cost to the stats.
1452        stats[i] = cost;
1453      }
1454
1455      // Now return the scaled cost from data held in the stats object.
1456      return costFromArray(stats);
1457    }
1458
1459    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1460      double cost = 0;
1461      for (BalancerRegionLoad rl : regionLoadList) {
1462        cost += getCostFromRl(rl);
1463      }
1464      return cost / regionLoadList.size();
1465    }
1466
1467    protected abstract double getCostFromRl(BalancerRegionLoad rl);
1468  }
1469
1470  /**
1471   * Class to be used for the subset of RegionLoad costs that should be treated as rates.
1472   * We do not compare about the actual rate in requests per second but rather the rate relative
1473   * to the rest of the regions.
1474   */
1475  abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
1476
1477    CostFromRegionLoadAsRateFunction(Configuration conf) {
1478      super(conf);
1479    }
1480
1481    @Override
1482    protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
1483      double cost = 0;
1484      double previous = 0;
1485      boolean isFirst = true;
1486      for (BalancerRegionLoad rl : regionLoadList) {
1487        double current = getCostFromRl(rl);
1488        if (isFirst) {
1489          isFirst = false;
1490        } else {
1491          cost += current - previous;
1492        }
1493        previous = current;
1494      }
1495      return Math.max(0, cost / (regionLoadList.size() - 1));
1496    }
1497  }
1498
1499  /**
1500   * Compute the cost of total number of read requests  The more unbalanced the higher the
1501   * computed cost will be.  This uses a rolling average of regionload.
1502   */
1503
1504  static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1505
1506    private static final String READ_REQUEST_COST_KEY =
1507        "hbase.master.balancer.stochastic.readRequestCost";
1508    private static final float DEFAULT_READ_REQUEST_COST = 5;
1509
1510    ReadRequestCostFunction(Configuration conf) {
1511      super(conf);
1512      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1513    }
1514
1515    @Override
1516    protected double getCostFromRl(BalancerRegionLoad rl) {
1517      return rl.getReadRequestsCount();
1518    }
1519  }
1520
1521  /**
1522   * Compute the cost of total number of coprocessor requests  The more unbalanced the higher the
1523   * computed cost will be.  This uses a rolling average of regionload.
1524   */
1525
1526  static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1527
1528    private static final String CP_REQUEST_COST_KEY =
1529        "hbase.master.balancer.stochastic.cpRequestCost";
1530    private static final float DEFAULT_CP_REQUEST_COST = 5;
1531
1532    CPRequestCostFunction(Configuration conf) {
1533      super(conf);
1534      this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
1535    }
1536
1537    @Override
1538    protected double getCostFromRl(BalancerRegionLoad rl) {
1539      return rl.getCpRequestsCount();
1540    }
1541  }
1542
1543  /**
1544   * Compute the cost of total number of write requests.  The more unbalanced the higher the
1545   * computed cost will be.  This uses a rolling average of regionload.
1546   */
1547  static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
1548
1549    private static final String WRITE_REQUEST_COST_KEY =
1550        "hbase.master.balancer.stochastic.writeRequestCost";
1551    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1552
1553    WriteRequestCostFunction(Configuration conf) {
1554      super(conf);
1555      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1556    }
1557
1558    @Override
1559    protected double getCostFromRl(BalancerRegionLoad rl) {
1560      return rl.getWriteRequestsCount();
1561    }
1562  }
1563
1564  /**
1565   * A cost function for region replicas. We give a very high cost to hosting
1566   * replicas of the same region in the same host. We do not prevent the case
1567   * though, since if numReplicas > numRegionServers, we still want to keep the
1568   * replica open.
1569   */
1570  static class RegionReplicaHostCostFunction extends CostFunction {
1571    private static final String REGION_REPLICA_HOST_COST_KEY =
1572        "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1573    private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1574
1575    long maxCost = 0;
1576    long[] costsPerGroup; // group is either server, host or rack
1577    int[][] primariesOfRegionsPerGroup;
1578
1579    public RegionReplicaHostCostFunction(Configuration conf) {
1580      super(conf);
1581      this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1582        DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1583    }
1584
1585    @Override
1586    void init(Cluster cluster) {
1587      super.init(cluster);
1588      // max cost is the case where every region replica is hosted together regardless of host
1589      maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1590      costsPerGroup = new long[cluster.numHosts];
1591      primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1592          ? cluster.primariesOfRegionsPerHost
1593          : cluster.primariesOfRegionsPerServer;
1594      for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1595        costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1596      }
1597    }
1598
1599    long getMaxCost(Cluster cluster) {
1600      if (!cluster.hasRegionReplicas) {
1601        return 0; // short circuit
1602      }
1603      // max cost is the case where every region replica is hosted together regardless of host
1604      int[] primariesOfRegions = new int[cluster.numRegions];
1605      System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1606          cluster.regions.length);
1607
1608      Arrays.sort(primariesOfRegions);
1609
1610      // compute numReplicas from the sorted array
1611      return costPerGroup(primariesOfRegions);
1612    }
1613
1614    @Override
1615    boolean isNeeded() {
1616      return cluster.hasRegionReplicas;
1617    }
1618
1619    @Override
1620    protected double cost() {
1621      if (maxCost <= 0) {
1622        return 0;
1623      }
1624
1625      long totalCost = 0;
1626      for (int i = 0 ; i < costsPerGroup.length; i++) {
1627        totalCost += costsPerGroup[i];
1628      }
1629      return scale(0, maxCost, totalCost);
1630    }
1631
1632    /**
1633     * For each primary region, it computes the total number of replicas in the array (numReplicas)
1634     * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1635     * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1636     * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1637     * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1638     * @return a sum of numReplicas-1 squared for each primary region in the group.
1639     */
1640    protected long costPerGroup(int[] primariesOfRegions) {
1641      long cost = 0;
1642      int currentPrimary = -1;
1643      int currentPrimaryIndex = -1;
1644      // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1645      // sharing the same primary will have consecutive numbers in the array.
1646      for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1647        int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1648        if (primary != currentPrimary) { // we see a new primary
1649          int numReplicas = j - currentPrimaryIndex;
1650          // square the cost
1651          if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1652            cost += (numReplicas - 1) * (numReplicas - 1);
1653          }
1654          currentPrimary = primary;
1655          currentPrimaryIndex = j;
1656        }
1657      }
1658
1659      return cost;
1660    }
1661
1662    @Override
1663    protected void regionMoved(int region, int oldServer, int newServer) {
1664      if (maxCost <= 0) {
1665        return; // no need to compute
1666      }
1667      if (cluster.multiServersPerHost) {
1668        int oldHost = cluster.serverIndexToHostIndex[oldServer];
1669        int newHost = cluster.serverIndexToHostIndex[newServer];
1670        if (newHost != oldHost) {
1671          costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1672          costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1673        }
1674      } else {
1675        costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1676        costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1677      }
1678    }
1679  }
1680
1681  /**
1682   * A cost function for region replicas for the rack distribution. We give a relatively high
1683   * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1684   * though.
1685   */
1686  static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1687    private static final String REGION_REPLICA_RACK_COST_KEY =
1688        "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1689    private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1690
1691    public RegionReplicaRackCostFunction(Configuration conf) {
1692      super(conf);
1693      this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1694        DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1695    }
1696
1697    @Override
1698    void init(Cluster cluster) {
1699      this.cluster = cluster;
1700      if (cluster.numRacks <= 1) {
1701        maxCost = 0;
1702        return; // disabled for 1 rack
1703      }
1704      // max cost is the case where every region replica is hosted together regardless of rack
1705      maxCost = getMaxCost(cluster);
1706      costsPerGroup = new long[cluster.numRacks];
1707      for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1708        costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1709      }
1710    }
1711
1712    @Override
1713    protected void regionMoved(int region, int oldServer, int newServer) {
1714      if (maxCost <= 0) {
1715        return; // no need to compute
1716      }
1717      int oldRack = cluster.serverIndexToRackIndex[oldServer];
1718      int newRack = cluster.serverIndexToRackIndex[newServer];
1719      if (newRack != oldRack) {
1720        costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1721        costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1722      }
1723    }
1724  }
1725
1726  /**
1727   * Compute the cost of total memstore size.  The more unbalanced the higher the
1728   * computed cost will be.  This uses a rolling average of regionload.
1729   */
1730  static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
1731
1732    private static final String MEMSTORE_SIZE_COST_KEY =
1733        "hbase.master.balancer.stochastic.memstoreSizeCost";
1734    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1735
1736    MemStoreSizeCostFunction(Configuration conf) {
1737      super(conf);
1738      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1739    }
1740
1741    @Override
1742    protected double getCostFromRl(BalancerRegionLoad rl) {
1743      return rl.getMemStoreSizeMB();
1744    }
1745  }
1746
1747  /**
1748   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1749   * computed cost will be.  This uses a rolling average of regionload.
1750   */
1751  static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1752
1753    private static final String STOREFILE_SIZE_COST_KEY =
1754        "hbase.master.balancer.stochastic.storefileSizeCost";
1755    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1756
1757    StoreFileCostFunction(Configuration conf) {
1758      super(conf);
1759      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1760    }
1761
1762    @Override
1763    protected double getCostFromRl(BalancerRegionLoad rl) {
1764      return rl.getStorefileSizeMB();
1765    }
1766  }
1767
1768  /**
1769   * A helper function to compose the attribute name from tablename and costfunction name
1770   */
1771  public static String composeAttributeName(String tableName, String costFunctionName) {
1772    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1773  }
1774}