View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Random;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLoad;
40  import org.apache.hadoop.hbase.ServerLoad;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.master.MasterServices;
44  import org.apache.hadoop.hbase.master.RegionPlan;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
48  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
49  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  
53  /**
54   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
55   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
56   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
57   * <ul>
58   * <li>Region Load</li>
59   * <li>Table Load</li>
60   * <li>Data Locality</li>
61   * <li>Memstore Sizes</li>
62   * <li>Storefile Sizes</li>
63   * </ul>
64   *
65   *
66   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
67   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
68   * scaled by their respective multipliers:</p>
69   *
70   * <ul>
71   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
72   *   <li>hbase.master.balancer.stochastic.moveCost</li>
73   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
74   *   <li>hbase.master.balancer.stochastic.localityCost</li>
75   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
76   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
77   * </ul>
78   *
79   * <p>In addition to the above configurations, the balancer can be tuned by the following
80   * configuration values:</p>
81   * <ul>
82   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
83   *   controls what the max number of regions that can be moved in a single invocation of this
84   *   balancer.</li>
85   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
86   *   regions is multiplied to try and get the number of times the balancer will
87   *   mutate all servers.</li>
88   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
89   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
90   *   value and the above computation.</li>
91   * </ul>
92   *
93   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
94   * so that the balancer gets the full picture of all loads on the cluster.</p>
95   */
96  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
97  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
98    justification="Complaint is about costFunctions not being synchronized; not end of the world")
99  public class StochasticLoadBalancer extends BaseLoadBalancer {
100 
101   protected static final String STEPS_PER_REGION_KEY =
102       "hbase.master.balancer.stochastic.stepsPerRegion";
103   protected static final String MAX_STEPS_KEY =
104       "hbase.master.balancer.stochastic.maxSteps";
105   protected static final String MAX_RUNNING_TIME_KEY =
106       "hbase.master.balancer.stochastic.maxRunningTime";
107   protected static final String KEEP_REGION_LOADS =
108       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
109   private static final String TABLE_FUNCTION_SEP = "_";
110 
111   private static final Random RANDOM = new Random(System.currentTimeMillis());
112   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
113 
114   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
115 
116   // values are defaults
117   private int maxSteps = 1000000;
118   private int stepsPerRegion = 800;
119   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
120   private int numRegionLoadsToRemember = 15;
121 
122   private CandidateGenerator[] candidateGenerators;
123   private CostFromRegionLoadFunction[] regionLoadFunctions;
124   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
125 
126   // to save and report costs to JMX
127   private Double curOverallCost = 0d;
128   private Double[] tempFunctionCosts;
129   private Double[] curFunctionCosts;
130 
131   // Keep locality based picker and cost function to alert them
132   // when new services are offered
133   private LocalityBasedCandidateGenerator localityCandidateGenerator;
134   private LocalityCostFunction localityCost;
135   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
136   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
137   private boolean isByTable = false;
138   private TableName tableName = null;
139 
140   /**
141    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
142    * default MetricsBalancer
143    */
144   public StochasticLoadBalancer() {
145     super(new MetricsStochasticBalancer());
146   }
147 
148   @Override
149   public void onConfigurationChange(Configuration conf) {
150     setConf(conf);
151   }
152 
153   @Override
154   public synchronized void setConf(Configuration conf) {
155     super.setConf(conf);
156     LOG.info("loading config");
157 
158     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
159 
160     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
161     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
162 
163     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
164     isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
165 
166     if (localityCandidateGenerator == null) {
167       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
168     }
169     localityCost = new LocalityCostFunction(conf, services);
170 
171     if (candidateGenerators == null) {
172       candidateGenerators = new CandidateGenerator[] {
173           new RandomCandidateGenerator(),
174           new LoadCandidateGenerator(),
175           localityCandidateGenerator,
176           new RegionReplicaRackCandidateGenerator(),
177       };
178     }
179 
180     regionLoadFunctions = new CostFromRegionLoadFunction[] {
181       new ReadRequestCostFunction(conf),
182       new WriteRequestCostFunction(conf),
183       new MemstoreSizeCostFunction(conf),
184       new StoreFileCostFunction(conf)
185     };
186 
187     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
188     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
189 
190     costFunctions = new CostFunction[]{
191       new RegionCountSkewCostFunction(conf),
192       new PrimaryRegionCountSkewCostFunction(conf),
193       new MoveCostFunction(conf),
194       localityCost,
195       new TableSkewCostFunction(conf),
196       regionReplicaHostCostFunction,
197       regionReplicaRackCostFunction,
198       regionLoadFunctions[0],
199       regionLoadFunctions[1],
200       regionLoadFunctions[2],
201       regionLoadFunctions[3],
202     };
203 
204     curFunctionCosts= new Double[costFunctions.length];
205     tempFunctionCosts= new Double[costFunctions.length];
206 
207   }
208 
209   @Override
210   protected void setSlop(Configuration conf) {
211     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
212   }
213 
214   @Override
215   public synchronized void setClusterStatus(ClusterStatus st) {
216     super.setClusterStatus(st);
217     updateRegionLoad();
218     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
219       cost.setClusterStatus(st);
220     }
221 
222     // update metrics size
223     try {
224       // by-table or ensemble mode
225       int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
226       int functionsCount = getCostFunctionNames().length;
227 
228       updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
229     } catch (Exception e) {
230       LOG.error("failed to get the size of all tables, exception = " + e.getMessage());
231     }
232   }
233 
234   /**
235    * Update the number of metrics that are reported to JMX
236    */
237   public void updateMetricsSize(int size) {
238     if (metricsBalancer instanceof MetricsStochasticBalancer) {
239         ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
240     }
241   }
242 
243   @Override
244   public synchronized void setMasterServices(MasterServices masterServices) {
245     super.setMasterServices(masterServices);
246     this.localityCost.setServices(masterServices);
247     this.localityCandidateGenerator.setServices(masterServices);
248 
249   }
250 
251   @Override
252   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
253     regionReplicaHostCostFunction.init(c);
254     if (regionReplicaHostCostFunction.cost() > 0) return true;
255     regionReplicaRackCostFunction.init(c);
256     if (regionReplicaRackCostFunction.cost() > 0) return true;
257     return false;
258   }
259 
260   @Override
261   public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
262     List<HRegionInfo>> clusterState) {
263     this.tableName = tableName;
264     return balanceCluster(clusterState);
265   }
266 
267   /**
268    * Given the cluster state this will try and approach an optimal balance. This
269    * should always approach the optimal state given enough steps.
270    */
271   @Override
272   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
273     List<HRegionInfo>> clusterState) {
274     List<RegionPlan> plans = balanceMasterRegions(clusterState);
275     if (plans != null || clusterState == null || clusterState.size() <= 1) {
276       return plans;
277     }
278 
279     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
280       if (clusterState.size() <= 2) {
281         return null;
282       }
283       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
284       clusterState.remove(masterServerName);
285     }
286 
287     // On clusters with lots of HFileLinks or lots of reference files,
288     // instantiating the storefile infos can be quite expensive.
289     // Allow turning this feature off if the locality cost is not going to
290     // be used in any computations.
291     RegionLocationFinder finder = null;
292     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
293       finder = this.regionFinder;
294     }
295 
296     //The clusterState that is given to this method contains the state
297     //of all the regions in the table(s) (that's true today)
298     // Keep track of servers to iterate through them.
299     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
300 
301     if (!needsBalance(cluster)) {
302       return null;
303     }
304 
305     long startTime = EnvironmentEdgeManager.currentTime();
306 
307     initCosts(cluster);
308 
309     double currentCost = computeCost(cluster, Double.MAX_VALUE);
310     curOverallCost = currentCost;
311     for (int i = 0; i < this.curFunctionCosts.length; i++) {
312       curFunctionCosts[i] = tempFunctionCosts[i];
313     }
314 
315     double initCost = currentCost;
316     double newCost = currentCost;
317 
318     long computedMaxSteps = Math.min(this.maxSteps,
319         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
320     // Perform a stochastic walk to see if we can get a good fit.
321     long step;
322 
323     for (step = 0; step < computedMaxSteps; step++) {
324       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
325       CandidateGenerator p = candidateGenerators[generatorIdx];
326       Cluster.Action action = p.generate(cluster);
327 
328       if (action.type == Type.NULL) {
329         continue;
330       }
331 
332       cluster.doAction(action);
333       updateCostsWithAction(cluster, action);
334 
335       newCost = computeCost(cluster, currentCost);
336 
337       // Should this be kept?
338       if (newCost < currentCost) {
339         currentCost = newCost;
340 
341         // save for JMX
342         curOverallCost = currentCost;
343         for (int i = 0; i < this.curFunctionCosts.length; i++) {
344           curFunctionCosts[i] = tempFunctionCosts[i];
345         }
346       } else {
347         // Put things back the way they were before.
348         // TODO: undo by remembering old values
349         Action undoAction = action.undoAction();
350         cluster.doAction(undoAction);
351         updateCostsWithAction(cluster, undoAction);
352       }
353 
354       if (EnvironmentEdgeManager.currentTime() - startTime >
355           maxRunningTime) {
356         break;
357       }
358     }
359     long endTime = EnvironmentEdgeManager.currentTime();
360 
361     metricsBalancer.balanceCluster(endTime - startTime);
362 
363     // update costs metrics
364     updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
365     if (initCost > currentCost) {
366       plans = createRegionPlans(cluster);
367       if (LOG.isDebugEnabled()) {
368         LOG.debug("Finished computing new load balance plan.  Computation took "
369             + (endTime - startTime) + "ms to try " + step
370             + " different iterations.  Found a solution that moves "
371             + plans.size() + " regions; Going from a computed cost of "
372             + initCost + " to a new cost of " + currentCost);
373       }
374 
375       return plans;
376     }
377     if (LOG.isDebugEnabled()) {
378       LOG.debug("Could not find a better load balance plan.  Tried "
379           + step + " different configurations in " + (endTime - startTime)
380           + "ms, and did not find anything with a computed cost less than " + initCost);
381     }
382     return null;
383   }
384 
385   /**
386    * update costs to JMX
387    */
388   private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
389     if (tableName == null) return;
390 
391     // check if the metricsBalancer is MetricsStochasticBalancer before casting
392     if (metricsBalancer instanceof MetricsStochasticBalancer) {
393       MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
394       // overall cost
395       balancer.updateStochasticCost(tableName.getNameAsString(),
396         "Overall", "Overall cost", overall);
397 
398       // each cost function
399       for (int i = 0; i < costFunctions.length; i++) {
400         CostFunction costFunction = costFunctions[i];
401         String costFunctionName = costFunction.getClass().getSimpleName();
402         Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
403         // TODO: cost function may need a specific description
404         balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
405           "The percent of " + costFunctionName, costPercent);
406       }
407     }
408   }
409 
410 
411   /**
412    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
413    * state.
414    *
415    * @param cluster The state of the cluster
416    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
417    */
418   private List<RegionPlan> createRegionPlans(Cluster cluster) {
419     List<RegionPlan> plans = new LinkedList<RegionPlan>();
420     for (int regionIndex = 0;
421          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
422       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
423       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
424 
425       if (initialServerIndex != newServerIndex) {
426         HRegionInfo region = cluster.regions[regionIndex];
427         ServerName initialServer = cluster.servers[initialServerIndex];
428         ServerName newServer = cluster.servers[newServerIndex];
429 
430         if (LOG.isTraceEnabled()) {
431           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
432               + initialServer.getHostname() + " to " + newServer.getHostname());
433         }
434         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
435         plans.add(rp);
436       }
437     }
438     return plans;
439   }
440 
441   /**
442    * Store the current region loads.
443    */
444   private synchronized void updateRegionLoad() {
445     // We create a new hashmap so that regions that are no longer there are removed.
446     // However we temporarily need the old loads so we can use them to keep the rolling average.
447     Map<String, Deque<RegionLoad>> oldLoads = loads;
448     loads = new HashMap<String, Deque<RegionLoad>>();
449 
450     for (ServerName sn : clusterStatus.getServers()) {
451       ServerLoad sl = clusterStatus.getLoad(sn);
452       if (sl == null) {
453         continue;
454       }
455       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
456         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
457         if (rLoads == null) {
458           // There was nothing there
459           rLoads = new ArrayDeque<RegionLoad>();
460         } else if (rLoads.size() >= numRegionLoadsToRemember) {
461           rLoads.remove();
462         }
463         rLoads.add(entry.getValue());
464         loads.put(Bytes.toString(entry.getKey()), rLoads);
465 
466       }
467     }
468 
469     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
470       cost.setLoads(loads);
471     }
472   }
473 
474   protected void initCosts(Cluster cluster) {
475     for (CostFunction c:costFunctions) {
476       c.init(cluster);
477     }
478   }
479 
480   protected void updateCostsWithAction(Cluster cluster, Action action) {
481     for (CostFunction c : costFunctions) {
482       c.postAction(action);
483     }
484   }
485 
486   /**
487    * Get the names of the cost functions
488    */
489   public String[] getCostFunctionNames() {
490     if (costFunctions == null) return null;
491     String[] ret = new String[costFunctions.length];
492     for (int i = 0; i < costFunctions.length; i++) {
493       CostFunction c = costFunctions[i];
494       ret[i] = c.getClass().getSimpleName();
495     }
496 
497     return ret;
498   }
499 
500   /**
501    * This is the main cost function.  It will compute a cost associated with a proposed cluster
502    * state.  All different costs will be combined with their multipliers to produce a double cost.
503    *
504    * @param cluster The state of the cluster
505    * @param previousCost the previous cost. This is used as an early out.
506    * @return a double of a cost associated with the proposed cluster state.  This cost is an
507    *         aggregate of all individual cost functions.
508    */
509   protected double computeCost(Cluster cluster, double previousCost) {
510     double total = 0;
511 
512     for (int i = 0; i < costFunctions.length; i++) {
513       CostFunction c = costFunctions[i];
514       this.tempFunctionCosts[i] = 0.0;
515 
516       if (c.getMultiplier() <= 0) {
517         continue;
518       }
519 
520       Float multiplier = c.getMultiplier();
521       Double cost = c.cost();
522 
523       this.tempFunctionCosts[i] = multiplier*cost;
524       total += this.tempFunctionCosts[i];
525 
526       if (total > previousCost) {
527         break;
528       }
529     }
530 
531     return total;
532   }
533 
534   /** Generates a candidate action to be applied to the cluster for cost function search */
535   abstract static class CandidateGenerator {
536     abstract Cluster.Action generate(Cluster cluster);
537 
538     /**
539      * From a list of regions pick a random one. Null can be returned which
540      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
541      * rather than swap.
542      *
543      * @param cluster        The state of the cluster
544      * @param server         index of the server
545      * @param chanceOfNoSwap Chance that this will decide to try a move rather
546      *                       than a swap.
547      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
548      *         suggested.
549      */
550     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
551       // Check to see if this is just a move.
552       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
553         // signal a move only.
554         return -1;
555       }
556       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
557       return cluster.regionsPerServer[server][rand];
558 
559     }
560     protected int pickRandomServer(Cluster cluster) {
561       if (cluster.numServers < 1) {
562         return -1;
563       }
564 
565       return RANDOM.nextInt(cluster.numServers);
566     }
567 
568     protected int pickRandomRack(Cluster cluster) {
569       if (cluster.numRacks < 1) {
570         return -1;
571       }
572 
573       return RANDOM.nextInt(cluster.numRacks);
574     }
575 
576     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
577       if (cluster.numServers < 2) {
578         return -1;
579       }
580       while (true) {
581         int otherServerIndex = pickRandomServer(cluster);
582         if (otherServerIndex != serverIndex) {
583           return otherServerIndex;
584         }
585       }
586     }
587 
588     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
589       if (cluster.numRacks < 2) {
590         return -1;
591       }
592       while (true) {
593         int otherRackIndex = pickRandomRack(cluster);
594         if (otherRackIndex != rackIndex) {
595           return otherRackIndex;
596         }
597       }
598     }
599 
600     protected Cluster.Action pickRandomRegions(Cluster cluster,
601                                                        int thisServer,
602                                                        int otherServer) {
603       if (thisServer < 0 || otherServer < 0) {
604         return Cluster.NullAction;
605       }
606 
607       // Decide who is most likely to need another region
608       int thisRegionCount = cluster.getNumRegions(thisServer);
609       int otherRegionCount = cluster.getNumRegions(otherServer);
610 
611       // Assign the chance based upon the above
612       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
613       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
614 
615       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
616       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
617 
618       return getAction(thisServer, thisRegion, otherServer, otherRegion);
619     }
620 
621     protected Cluster.Action getAction(int fromServer, int fromRegion,
622         int toServer, int toRegion) {
623       if (fromServer < 0 || toServer < 0) {
624         return Cluster.NullAction;
625       }
626       if (fromRegion > 0 && toRegion > 0) {
627         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
628           toServer, toRegion);
629       } else if (fromRegion > 0) {
630         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
631       } else if (toRegion > 0) {
632         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
633       } else {
634         return Cluster.NullAction;
635       }
636     }
637   }
638 
639   static class RandomCandidateGenerator extends CandidateGenerator {
640 
641     @Override
642     Cluster.Action generate(Cluster cluster) {
643 
644       int thisServer = pickRandomServer(cluster);
645 
646       // Pick the other server
647       int otherServer = pickOtherRandomServer(cluster, thisServer);
648 
649       return pickRandomRegions(cluster, thisServer, otherServer);
650     }
651   }
652 
653   static class LoadCandidateGenerator extends CandidateGenerator {
654 
655     @Override
656     Cluster.Action generate(Cluster cluster) {
657       cluster.sortServersByRegionCount();
658       int thisServer = pickMostLoadedServer(cluster, -1);
659       int otherServer = pickLeastLoadedServer(cluster, thisServer);
660 
661       return pickRandomRegions(cluster, thisServer, otherServer);
662     }
663 
664     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
665       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
666 
667       int index = 0;
668       while (servers[index] == null || servers[index] == thisServer) {
669         index++;
670         if (index == servers.length) {
671           return -1;
672         }
673       }
674       return servers[index];
675     }
676 
677     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
678       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
679 
680       int index = servers.length - 1;
681       while (servers[index] == null || servers[index] == thisServer) {
682         index--;
683         if (index < 0) {
684           return -1;
685         }
686       }
687       return servers[index];
688     }
689   }
690 
691   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
692 
693     private MasterServices masterServices;
694 
695     LocalityBasedCandidateGenerator(MasterServices masterServices) {
696       this.masterServices = masterServices;
697     }
698 
699     @Override
700     Cluster.Action generate(Cluster cluster) {
701       if (this.masterServices == null) {
702         int thisServer = pickRandomServer(cluster);
703         // Pick the other server
704         int otherServer = pickOtherRandomServer(cluster, thisServer);
705         return pickRandomRegions(cluster, thisServer, otherServer);
706       }
707 
708       int thisServer = pickRandomServer(cluster);
709       int thisRegion;
710       if (thisServer == -1) {
711         LOG.warn("Could not pick lowest locality region server");
712         return Cluster.NullAction;
713       } else {
714       // Pick lowest locality region on this server
715         thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
716       }
717 
718       if (thisRegion == -1) {
719         return Cluster.NullAction;
720       }
721 
722       // Pick the least loaded server with good locality for the region
723       int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion, thisServer);
724 
725       if (otherServer == -1) {
726         return Cluster.NullAction;
727       }
728 
729       // Let the candidate region be moved to its highest locality server.
730       int otherRegion = -1;
731 
732       return getAction(thisServer, thisRegion, otherServer, otherRegion);
733     }
734 
735     private int pickLowestLocalityServer(Cluster cluster) {
736       return cluster.getLowestLocalityRegionServer();
737     }
738 
739     private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
740       return cluster.getLowestLocalityRegionOnServer(server);
741     }
742 
743     void setServices(MasterServices services) {
744       this.masterServices = services;
745     }
746   }
747 
748   /**
749    * Generates candidates which moves the replicas out of the region server for
750    * co-hosted region replicas
751    */
752   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
753 
754     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
755 
756     /**
757      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
758      * (a group is a server, host or rack)
759      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
760      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
761      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
762      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
763      * @return a regionIndex for the selected primary or -1 if there is no co-locating
764      */
765     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
766         , int[] regionIndexToPrimaryIndex) {
767       int currentPrimary = -1;
768       int currentPrimaryIndex = -1;
769       int selectedPrimaryIndex = -1;
770       double currentLargestRandom = -1;
771       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
772       // ids for the regions hosted in server, a consecutive repetition means that replicas
773       // are co-hosted
774       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
775         int primary = j < primariesOfRegionsPerGroup.length
776             ? primariesOfRegionsPerGroup[j] : -1;
777         if (primary != currentPrimary) { // check for whether we see a new primary
778           int numReplicas = j - currentPrimaryIndex;
779           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
780             // decide to select this primary region id or not
781             double currentRandom = RANDOM.nextDouble();
782             // we don't know how many region replicas are co-hosted, we will randomly select one
783             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
784             if (currentRandom > currentLargestRandom) {
785               selectedPrimaryIndex = currentPrimary;
786               currentLargestRandom = currentRandom;
787             }
788           }
789           currentPrimary = primary;
790           currentPrimaryIndex = j;
791         }
792       }
793 
794       // we have found the primary id for the region to move. Now find the actual regionIndex
795       // with the given primary, prefer to move the secondary region.
796       for (int j = 0; j < regionsPerGroup.length; j++) {
797         int regionIndex = regionsPerGroup[j];
798         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
799           // always move the secondary, not the primary
800           if (selectedPrimaryIndex != regionIndex) {
801             return regionIndex;
802           }
803         }
804       }
805       return -1;
806     }
807 
808     @Override
809     Cluster.Action generate(Cluster cluster) {
810       int serverIndex = pickRandomServer(cluster);
811       if (cluster.numServers <= 1 || serverIndex == -1) {
812         return Cluster.NullAction;
813       }
814 
815       int regionIndex = selectCoHostedRegionPerGroup(
816         cluster.primariesOfRegionsPerServer[serverIndex],
817         cluster.regionsPerServer[serverIndex],
818         cluster.regionIndexToPrimaryIndex);
819 
820       // if there are no pairs of region replicas co-hosted, default to random generator
821       if (regionIndex == -1) {
822         // default to randompicker
823         return randomGenerator.generate(cluster);
824       }
825 
826       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
827       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
828       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
829     }
830   }
831 
832   /**
833    * Generates candidates which moves the replicas out of the rack for
834    * co-hosted region replicas in the same rack
835    */
836   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
837     @Override
838     Cluster.Action generate(Cluster cluster) {
839       int rackIndex = pickRandomRack(cluster);
840       if (cluster.numRacks <= 1 || rackIndex == -1) {
841         return super.generate(cluster);
842       }
843 
844       int regionIndex = selectCoHostedRegionPerGroup(
845         cluster.primariesOfRegionsPerRack[rackIndex],
846         cluster.regionsPerRack[rackIndex],
847         cluster.regionIndexToPrimaryIndex);
848 
849       // if there are no pairs of region replicas co-hosted, default to random generator
850       if (regionIndex == -1) {
851         // default to randompicker
852         return randomGenerator.generate(cluster);
853       }
854 
855       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
856       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
857 
858       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
859       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
860       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
861       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
862     }
863   }
864 
865   /**
866    * Base class of StochasticLoadBalancer's Cost Functions.
867    */
868   abstract static class CostFunction {
869 
870     private float multiplier = 0;
871 
872     protected Cluster cluster;
873 
874     CostFunction(Configuration c) {
875 
876     }
877 
878     float getMultiplier() {
879       return multiplier;
880     }
881 
882     void setMultiplier(float m) {
883       this.multiplier = m;
884     }
885 
886     /** Called once per LB invocation to give the cost function
887      * to initialize it's state, and perform any costly calculation.
888      */
889     void init(Cluster cluster) {
890       this.cluster = cluster;
891     }
892 
893     /** Called once per cluster Action to give the cost function
894      * an opportunity to update it's state. postAction() is always
895      * called at least once before cost() is called with the cluster
896      * that this action is performed on. */
897     void postAction(Action action) {
898       switch (action.type) {
899       case NULL: break;
900       case ASSIGN_REGION:
901         AssignRegionAction ar = (AssignRegionAction) action;
902         regionMoved(ar.region, -1, ar.server);
903         break;
904       case MOVE_REGION:
905         MoveRegionAction mra = (MoveRegionAction) action;
906         regionMoved(mra.region, mra.fromServer, mra.toServer);
907         break;
908       case SWAP_REGIONS:
909         SwapRegionsAction a = (SwapRegionsAction) action;
910         regionMoved(a.fromRegion, a.fromServer, a.toServer);
911         regionMoved(a.toRegion, a.toServer, a.fromServer);
912         break;
913       default:
914         throw new RuntimeException("Uknown action:" + action.type);
915       }
916     }
917 
918     protected void regionMoved(int region, int oldServer, int newServer) {
919     }
920 
921     abstract double cost();
922 
923     /**
924      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
925      * assumes that this is a zero sum set of costs.  It assumes that the worst case
926      * possible is all of the elements in one region server and the rest having 0.
927      *
928      * @param stats the costs
929      * @return a scaled set of costs.
930      */
931     protected double costFromArray(double[] stats) {
932       double totalCost = 0;
933       double total = getSum(stats);
934 
935       double count = stats.length;
936       double mean = total/count;
937 
938       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
939       // a zero sum cost for this to make sense.
940       double max = ((count - 1) * mean) + (total - mean);
941 
942       // It's possible that there aren't enough regions to go around
943       double min;
944       if (count > total) {
945         min = ((count - total) * mean) + ((1 - mean) * total);
946       } else {
947         // Some will have 1 more than everything else.
948         int numHigh = (int) (total - (Math.floor(mean) * count));
949         int numLow = (int) (count - numHigh);
950 
951         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
952 
953       }
954       min = Math.max(0, min);
955       for (int i=0; i<stats.length; i++) {
956         double n = stats[i];
957         double diff = Math.abs(mean - n);
958         totalCost += diff;
959       }
960 
961       double scaled =  scale(min, max, totalCost);
962       return scaled;
963     }
964 
965     private double getSum(double[] stats) {
966       double total = 0;
967       for(double s:stats) {
968         total += s;
969       }
970       return total;
971     }
972 
973     /**
974      * Scale the value between 0 and 1.
975      *
976      * @param min   Min value
977      * @param max   The Max value
978      * @param value The value to be scaled.
979      * @return The scaled value.
980      */
981     protected double scale(double min, double max, double value) {
982       if (max <= min || value <= min) {
983         return 0;
984       }
985       if ((max - min) == 0) return 0;
986 
987       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
988     }
989   }
990 
991   /**
992    * Given the starting state of the regions and a potential ending state
993    * compute cost based upon the number of regions that have moved.
994    */
995   static class MoveCostFunction extends CostFunction {
996     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
997     private static final String MAX_MOVES_PERCENT_KEY =
998         "hbase.master.balancer.stochastic.maxMovePercent";
999     private static final float DEFAULT_MOVE_COST = 7;
1000     private static final int DEFAULT_MAX_MOVES = 600;
1001     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1002 
1003     private final float maxMovesPercent;
1004 
1005     MoveCostFunction(Configuration conf) {
1006       super(conf);
1007 
1008       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1009       // that large benefits are need to overcome the cost of a move.
1010       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1011       // What percent of the number of regions a single run of the balancer can move.
1012       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1013     }
1014 
1015     @Override
1016     double cost() {
1017       // Try and size the max number of Moves, but always be prepared to move some.
1018       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1019           DEFAULT_MAX_MOVES);
1020 
1021       double moveCost = cluster.numMovedRegions;
1022 
1023       // Don't let this single balance move more than the max moves.
1024       // This allows better scaling to accurately represent the actual cost of a move.
1025       if (moveCost > maxMoves) {
1026         return 1000000;   // return a number much greater than any of the other cost
1027       }
1028 
1029       return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1030     }
1031   }
1032 
1033   /**
1034    * Compute the cost of a potential cluster state from skew in number of
1035    * regions on a cluster.
1036    */
1037   static class RegionCountSkewCostFunction extends CostFunction {
1038     private static final String REGION_COUNT_SKEW_COST_KEY =
1039         "hbase.master.balancer.stochastic.regionCountCost";
1040     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1041 
1042     private double[] stats = null;
1043 
1044     RegionCountSkewCostFunction(Configuration conf) {
1045       super(conf);
1046       // Load multiplier should be the greatest as it is the most general way to balance data.
1047       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1048     }
1049 
1050     @Override
1051     double cost() {
1052       if (stats == null || stats.length != cluster.numServers) {
1053         stats = new double[cluster.numServers];
1054       }
1055 
1056       for (int i =0; i < cluster.numServers; i++) {
1057         stats[i] = cluster.regionsPerServer[i].length;
1058       }
1059 
1060       return costFromArray(stats);
1061     }
1062   }
1063 
1064   /**
1065    * Compute the cost of a potential cluster state from skew in number of
1066    * primary regions on a cluster.
1067    */
1068   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1069     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1070         "hbase.master.balancer.stochastic.primaryRegionCountCost";
1071     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1072 
1073     private double[] stats = null;
1074 
1075     PrimaryRegionCountSkewCostFunction(Configuration conf) {
1076       super(conf);
1077       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1078       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1079         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1080     }
1081 
1082     @Override
1083     double cost() {
1084       if (!cluster.hasRegionReplicas) {
1085         return 0;
1086       }
1087       if (stats == null || stats.length != cluster.numServers) {
1088         stats = new double[cluster.numServers];
1089       }
1090 
1091       for (int i =0; i < cluster.numServers; i++) {
1092         stats[i] = 0;
1093         for (int regionIdx : cluster.regionsPerServer[i]) {
1094           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1095             stats[i] ++;
1096           }
1097         }
1098       }
1099 
1100       return costFromArray(stats);
1101     }
1102   }
1103 
1104   /**
1105    * Compute the cost of a potential cluster configuration based upon how evenly
1106    * distributed tables are.
1107    */
1108   static class TableSkewCostFunction extends CostFunction {
1109 
1110     private static final String TABLE_SKEW_COST_KEY =
1111         "hbase.master.balancer.stochastic.tableSkewCost";
1112     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1113 
1114     TableSkewCostFunction(Configuration conf) {
1115       super(conf);
1116       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1117     }
1118 
1119     @Override
1120     double cost() {
1121       double max = cluster.numRegions;
1122       double min = ((double) cluster.numRegions) / cluster.numServers;
1123       double value = 0;
1124 
1125       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1126         value += cluster.numMaxRegionsPerTable[i];
1127       }
1128 
1129       return scale(min, max, value);
1130     }
1131   }
1132 
1133   /**
1134    * Compute a cost of a potential cluster configuration based upon where
1135    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1136    */
1137   static class LocalityCostFunction extends CostFunction {
1138 
1139     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1140     private static final float DEFAULT_LOCALITY_COST = 25;
1141 
1142     private MasterServices services;
1143 
1144     LocalityCostFunction(Configuration conf, MasterServices srv) {
1145       super(conf);
1146       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1147       this.services = srv;
1148     }
1149 
1150     void setServices(MasterServices srvc) {
1151       this.services = srvc;
1152     }
1153 
1154     @Override
1155     double cost() {
1156       double max = 0;
1157       double cost = 0;
1158 
1159       // If there's no master so there's no way anything else works.
1160       if (this.services == null) {
1161         return cost;
1162       }
1163 
1164       for (int i = 0; i < cluster.regionLocations.length; i++) {
1165         max += 1;
1166         int serverIndex = cluster.regionIndexToServerIndex[i];
1167         int[] regionLocations = cluster.regionLocations[i];
1168 
1169         // If we can't find where the data is getTopBlock returns null.
1170         // so count that as being the best possible.
1171         if (regionLocations == null) {
1172           continue;
1173         }
1174 
1175         int index = -1;
1176         for (int j = 0; j < regionLocations.length; j++) {
1177           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1178             index = j;
1179             break;
1180           }
1181         }
1182 
1183         if (index < 0) {
1184           cost += 1;
1185         } else {
1186           cost += (1 - cluster.getLocalityOfRegion(i, index));
1187         }
1188       }
1189       return scale(0, max, cost);
1190     }
1191   }
1192 
1193   /**
1194    * Base class the allows writing costs functions from rolling average of some
1195    * number from RegionLoad.
1196    */
1197   abstract static class CostFromRegionLoadFunction extends CostFunction {
1198 
1199     private ClusterStatus clusterStatus = null;
1200     private Map<String, Deque<RegionLoad>> loads = null;
1201     private double[] stats = null;
1202     CostFromRegionLoadFunction(Configuration conf) {
1203       super(conf);
1204     }
1205 
1206     void setClusterStatus(ClusterStatus status) {
1207       this.clusterStatus = status;
1208     }
1209 
1210     void setLoads(Map<String, Deque<RegionLoad>> l) {
1211       this.loads = l;
1212     }
1213 
1214     @Override
1215     double cost() {
1216       if (clusterStatus == null || loads == null) {
1217         return 0;
1218       }
1219 
1220       if (stats == null || stats.length != cluster.numServers) {
1221         stats = new double[cluster.numServers];
1222       }
1223 
1224       for (int i =0; i < stats.length; i++) {
1225         //Cost this server has from RegionLoad
1226         long cost = 0;
1227 
1228         // for every region on this server get the rl
1229         for(int regionIndex:cluster.regionsPerServer[i]) {
1230           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1231 
1232           // Now if we found a region load get the type of cost that was requested.
1233           if (regionLoadList != null) {
1234             cost += getRegionLoadCost(regionLoadList);
1235           }
1236         }
1237 
1238         // Add the total cost to the stats.
1239         stats[i] = cost;
1240       }
1241 
1242       // Now return the scaled cost from data held in the stats object.
1243       return costFromArray(stats);
1244     }
1245 
1246     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1247       double cost = 0;
1248 
1249       for (RegionLoad rl : regionLoadList) {
1250         double toAdd = getCostFromRl(rl);
1251 
1252         if (cost == 0) {
1253           cost = toAdd;
1254         } else {
1255           cost = (.5 * cost) + (.5 * toAdd);
1256         }
1257       }
1258 
1259       return cost;
1260     }
1261 
1262     protected abstract double getCostFromRl(RegionLoad rl);
1263   }
1264 
1265   /**
1266    * Compute the cost of total number of read requests  The more unbalanced the higher the
1267    * computed cost will be.  This uses a rolling average of regionload.
1268    */
1269 
1270   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1271 
1272     private static final String READ_REQUEST_COST_KEY =
1273         "hbase.master.balancer.stochastic.readRequestCost";
1274     private static final float DEFAULT_READ_REQUEST_COST = 5;
1275 
1276     ReadRequestCostFunction(Configuration conf) {
1277       super(conf);
1278       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1279     }
1280 
1281 
1282     @Override
1283     protected double getCostFromRl(RegionLoad rl) {
1284       return rl.getReadRequestsCount();
1285     }
1286   }
1287 
1288   /**
1289    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1290    * computed cost will be.  This uses a rolling average of regionload.
1291    */
1292   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1293 
1294     private static final String WRITE_REQUEST_COST_KEY =
1295         "hbase.master.balancer.stochastic.writeRequestCost";
1296     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1297 
1298     WriteRequestCostFunction(Configuration conf) {
1299       super(conf);
1300       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1301     }
1302 
1303     @Override
1304     protected double getCostFromRl(RegionLoad rl) {
1305       return rl.getWriteRequestsCount();
1306     }
1307   }
1308 
1309   /**
1310    * A cost function for region replicas. We give a very high cost to hosting
1311    * replicas of the same region in the same host. We do not prevent the case
1312    * though, since if numReplicas > numRegionServers, we still want to keep the
1313    * replica open.
1314    */
1315   static class RegionReplicaHostCostFunction extends CostFunction {
1316     private static final String REGION_REPLICA_HOST_COST_KEY =
1317         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1318     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1319 
1320     long maxCost = 0;
1321     long[] costsPerGroup; // group is either server, host or rack
1322     int[][] primariesOfRegionsPerGroup;
1323 
1324     public RegionReplicaHostCostFunction(Configuration conf) {
1325       super(conf);
1326       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1327         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1328     }
1329 
1330     @Override
1331     void init(Cluster cluster) {
1332       super.init(cluster);
1333       // max cost is the case where every region replica is hosted together regardless of host
1334       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1335       costsPerGroup = new long[cluster.numHosts];
1336       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1337           ? cluster.primariesOfRegionsPerHost
1338           : cluster.primariesOfRegionsPerServer;
1339       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1340         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1341       }
1342     }
1343 
1344     long getMaxCost(Cluster cluster) {
1345       if (!cluster.hasRegionReplicas) {
1346         return 0; // short circuit
1347       }
1348       // max cost is the case where every region replica is hosted together regardless of host
1349       int[] primariesOfRegions = new int[cluster.numRegions];
1350       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1351           cluster.regions.length);
1352 
1353       Arrays.sort(primariesOfRegions);
1354 
1355       // compute numReplicas from the sorted array
1356       return costPerGroup(primariesOfRegions);
1357     }
1358 
1359     @Override
1360     double cost() {
1361       if (maxCost <= 0) {
1362         return 0;
1363       }
1364 
1365       long totalCost = 0;
1366       for (int i = 0 ; i < costsPerGroup.length; i++) {
1367         totalCost += costsPerGroup[i];
1368       }
1369       return scale(0, maxCost, totalCost);
1370     }
1371 
1372     /**
1373      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1374      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1375      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1376      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1377      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1378      * @return a sum of numReplicas-1 squared for each primary region in the group.
1379      */
1380     protected long costPerGroup(int[] primariesOfRegions) {
1381       long cost = 0;
1382       int currentPrimary = -1;
1383       int currentPrimaryIndex = -1;
1384       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1385       // sharing the same primary will have consecutive numbers in the array.
1386       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1387         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1388         if (primary != currentPrimary) { // we see a new primary
1389           int numReplicas = j - currentPrimaryIndex;
1390           // square the cost
1391           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1392             cost += (numReplicas - 1) * (numReplicas - 1);
1393           }
1394           currentPrimary = primary;
1395           currentPrimaryIndex = j;
1396         }
1397       }
1398 
1399       return cost;
1400     }
1401 
1402     @Override
1403     protected void regionMoved(int region, int oldServer, int newServer) {
1404       if (maxCost <= 0) {
1405         return; // no need to compute
1406       }
1407       if (cluster.multiServersPerHost) {
1408         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1409         int newHost = cluster.serverIndexToHostIndex[newServer];
1410         if (newHost != oldHost) {
1411           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1412           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1413         }
1414       } else {
1415         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1416         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1417       }
1418     }
1419   }
1420 
1421   /**
1422    * A cost function for region replicas for the rack distribution. We give a relatively high
1423    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1424    * though.
1425    */
1426   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1427     private static final String REGION_REPLICA_RACK_COST_KEY =
1428         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1429     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1430 
1431     public RegionReplicaRackCostFunction(Configuration conf) {
1432       super(conf);
1433       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1434         DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1435     }
1436 
1437     @Override
1438     void init(Cluster cluster) {
1439       this.cluster = cluster;
1440       if (cluster.numRacks <= 1) {
1441         maxCost = 0;
1442         return; // disabled for 1 rack
1443       }
1444       // max cost is the case where every region replica is hosted together regardless of rack
1445       maxCost = getMaxCost(cluster);
1446       costsPerGroup = new long[cluster.numRacks];
1447       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1448         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1449       }
1450     }
1451 
1452     @Override
1453     protected void regionMoved(int region, int oldServer, int newServer) {
1454       if (maxCost <= 0) {
1455         return; // no need to compute
1456       }
1457       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1458       int newRack = cluster.serverIndexToRackIndex[newServer];
1459       if (newRack != oldRack) {
1460         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1461         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1462       }
1463     }
1464   }
1465 
1466   /**
1467    * Compute the cost of total memstore size.  The more unbalanced the higher the
1468    * computed cost will be.  This uses a rolling average of regionload.
1469    */
1470   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1471 
1472     private static final String MEMSTORE_SIZE_COST_KEY =
1473         "hbase.master.balancer.stochastic.memstoreSizeCost";
1474     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1475 
1476     MemstoreSizeCostFunction(Configuration conf) {
1477       super(conf);
1478       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1479     }
1480 
1481     @Override
1482     protected double getCostFromRl(RegionLoad rl) {
1483       return rl.getMemStoreSizeMB();
1484     }
1485   }
1486   /**
1487    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1488    * computed cost will be.  This uses a rolling average of regionload.
1489    */
1490   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1491 
1492     private static final String STOREFILE_SIZE_COST_KEY =
1493         "hbase.master.balancer.stochastic.storefileSizeCost";
1494     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1495 
1496     StoreFileCostFunction(Configuration conf) {
1497       super(conf);
1498       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1499     }
1500 
1501     @Override
1502     protected double getCostFromRl(RegionLoad rl) {
1503       return rl.getStorefileSizeMB();
1504     }
1505   }
1506 
1507   /**
1508    * A helper function to compose the attribute name from tablename and costfunction name
1509    */
1510   public static String composeAttributeName(String tableName, String costFunctionName) {
1511     return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1512   }
1513 }