View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Random;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.RegionLoad;
39  import org.apache.hadoop.hbase.ServerLoad;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.master.MasterServices;
42  import org.apache.hadoop.hbase.master.RegionPlan;
43  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
44  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  /**
52   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
53   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
54   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
55   * <ul>
56   * <li>Region Load</li>
57   * <li>Table Load</li>
58   * <li>Data Locality</li>
59   * <li>Memstore Sizes</li>
60   * <li>Storefile Sizes</li>
61   * </ul>
62   *
63   *
64   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
65   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
66   * scaled by their respective multipliers:</p>
67   *
68   * <ul>
69   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
70   *   <li>hbase.master.balancer.stochastic.moveCost</li>
71   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
72   *   <li>hbase.master.balancer.stochastic.localityCost</li>
73   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
74   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
75   * </ul>
76   *
77   * <p>In addition to the above configurations, the balancer can be tuned by the following
78   * configuration values:</p>
79   * <ul>
80   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
81   *   controls what the max number of regions that can be moved in a single invocation of this
82   *   balancer.</li>
83   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
84   *   regions is multiplied to try and get the number of times the balancer will
85   *   mutate all servers.</li>
86   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
87   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
88   *   value and the above computation.</li>
89   * </ul>
90   *
91   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
92   * so that the balancer gets the full picture of all loads on the cluster.</p>
93   */
94  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
95  public class StochasticLoadBalancer extends BaseLoadBalancer {
96  
97    protected static final String STEPS_PER_REGION_KEY =
98        "hbase.master.balancer.stochastic.stepsPerRegion";
99    protected static final String MAX_STEPS_KEY =
100       "hbase.master.balancer.stochastic.maxSteps";
101   protected static final String MAX_RUNNING_TIME_KEY =
102       "hbase.master.balancer.stochastic.maxRunningTime";
103   protected static final String KEEP_REGION_LOADS =
104       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
105 
106   private static final Random RANDOM = new Random(System.currentTimeMillis());
107   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
108 
109   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
110 
111   // values are defaults
112   private int maxSteps = 1000000;
113   private int stepsPerRegion = 800;
114   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
115   private int numRegionLoadsToRemember = 15;
116 
117   private CandidateGenerator[] candidateGenerators;
118   private CostFromRegionLoadFunction[] regionLoadFunctions;
119   private CostFunction[] costFunctions;
120   // Keep locality based picker and cost function to alert them
121   // when new services are offered
122   private LocalityBasedCandidateGenerator localityCandidateGenerator;
123   private LocalityCostFunction localityCost;
124   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
125   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
126 
127   @Override
128   public void onConfigurationChange(Configuration conf) {
129     setConf(conf);
130   }
131 
132   @Override
133   public synchronized void setConf(Configuration conf) {
134     super.setConf(conf);
135     LOG.info("loading config");
136 
137     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
138 
139     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
140     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
141 
142     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
143 
144     if (localityCandidateGenerator == null) {
145       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
146     }
147     localityCost = new LocalityCostFunction(conf, services);
148 
149     if (candidateGenerators == null) {
150       candidateGenerators = new CandidateGenerator[] {
151           new RandomCandidateGenerator(),
152           new LoadCandidateGenerator(),
153           localityCandidateGenerator,
154           new RegionReplicaRackCandidateGenerator(),
155       };
156     }
157 
158     regionLoadFunctions = new CostFromRegionLoadFunction[] {
159       new ReadRequestCostFunction(conf),
160       new WriteRequestCostFunction(conf),
161       new MemstoreSizeCostFunction(conf),
162       new StoreFileCostFunction(conf)
163     };
164 
165     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
166     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
167 
168     costFunctions = new CostFunction[]{
169       new RegionCountSkewCostFunction(conf),
170       new PrimaryRegionCountSkewCostFunction(conf),
171       new MoveCostFunction(conf),
172       localityCost,
173       new TableSkewCostFunction(conf),
174       regionReplicaHostCostFunction,
175       regionReplicaRackCostFunction,
176       regionLoadFunctions[0],
177       regionLoadFunctions[1],
178       regionLoadFunctions[2],
179       regionLoadFunctions[3],
180     };
181   }
182 
183   @Override
184   protected void setSlop(Configuration conf) {
185     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
186   }
187 
188   @Override
189   public synchronized void setClusterStatus(ClusterStatus st) {
190     super.setClusterStatus(st);
191     updateRegionLoad();
192     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
193       cost.setClusterStatus(st);
194     }
195   }
196 
197   @Override
198   public synchronized void setMasterServices(MasterServices masterServices) {
199     super.setMasterServices(masterServices);
200     this.localityCost.setServices(masterServices);
201     this.localityCandidateGenerator.setServices(masterServices);
202 
203   }
204 
205   @Override
206   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
207     regionReplicaHostCostFunction.init(c);
208     if (regionReplicaHostCostFunction.cost() > 0) return true;
209     regionReplicaRackCostFunction.init(c);
210     if (regionReplicaRackCostFunction.cost() > 0) return true;
211     return false;
212   }
213 
214   /**
215    * Given the cluster state this will try and approach an optimal balance. This
216    * should always approach the optimal state given enough steps.
217    */
218   @Override
219   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
220     List<HRegionInfo>> clusterState) {
221     List<RegionPlan> plans = balanceMasterRegions(clusterState);
222     if (plans != null || clusterState == null || clusterState.size() <= 1) {
223       return plans;
224     }
225     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
226       if (clusterState.size() <= 2) {
227         return null;
228       }
229       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
230       clusterState.remove(masterServerName);
231     }
232 
233     // On clusters with lots of HFileLinks or lots of reference files,
234     // instantiating the storefile infos can be quite expensive.
235     // Allow turning this feature off if the locality cost is not going to
236     // be used in any computations.
237     RegionLocationFinder finder = null;
238     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
239       finder = this.regionFinder;
240     }
241 
242     //The clusterState that is given to this method contains the state
243     //of all the regions in the table(s) (that's true today)
244     // Keep track of servers to iterate through them.
245     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
246     if (!needsBalance(cluster)) {
247       return null;
248     }
249 
250     long startTime = EnvironmentEdgeManager.currentTime();
251 
252     initCosts(cluster);
253 
254     double currentCost = computeCost(cluster, Double.MAX_VALUE);
255 
256     double initCost = currentCost;
257     double newCost = currentCost;
258 
259     long computedMaxSteps = Math.min(this.maxSteps,
260         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
261     // Perform a stochastic walk to see if we can get a good fit.
262     long step;
263 
264     for (step = 0; step < computedMaxSteps; step++) {
265       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
266       CandidateGenerator p = candidateGenerators[generatorIdx];
267       Cluster.Action action = p.generate(cluster);
268 
269       if (action.type == Type.NULL) {
270         continue;
271       }
272 
273       cluster.doAction(action);
274       updateCostsWithAction(cluster, action);
275 
276       newCost = computeCost(cluster, currentCost);
277 
278       // Should this be kept?
279       if (newCost < currentCost) {
280         currentCost = newCost;
281       } else {
282         // Put things back the way they were before.
283         // TODO: undo by remembering old values
284         Action undoAction = action.undoAction();
285         cluster.doAction(undoAction);
286         updateCostsWithAction(cluster, undoAction);
287       }
288 
289       if (EnvironmentEdgeManager.currentTime() - startTime >
290           maxRunningTime) {
291         break;
292       }
293     }
294 
295     long endTime = EnvironmentEdgeManager.currentTime();
296 
297     metricsBalancer.balanceCluster(endTime - startTime);
298 
299     if (initCost > currentCost) {
300       plans = createRegionPlans(cluster);
301       if (LOG.isDebugEnabled()) {
302         LOG.debug("Finished computing new load balance plan.  Computation took "
303             + (endTime - startTime) + "ms to try " + step
304             + " different iterations.  Found a solution that moves "
305             + plans.size() + " regions; Going from a computed cost of "
306             + initCost + " to a new cost of " + currentCost);
307       }
308       return plans;
309     }
310     if (LOG.isDebugEnabled()) {
311       LOG.debug("Could not find a better load balance plan.  Tried "
312           + step + " different configurations in " + (endTime - startTime)
313           + "ms, and did not find anything with a computed cost less than " + initCost);
314     }
315     return null;
316   }
317 
318   /**
319    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
320    * state.
321    *
322    * @param cluster The state of the cluster
323    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
324    */
325   private List<RegionPlan> createRegionPlans(Cluster cluster) {
326     List<RegionPlan> plans = new LinkedList<RegionPlan>();
327     for (int regionIndex = 0;
328          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
329       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
330       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
331 
332       if (initialServerIndex != newServerIndex) {
333         HRegionInfo region = cluster.regions[regionIndex];
334         ServerName initialServer = cluster.servers[initialServerIndex];
335         ServerName newServer = cluster.servers[newServerIndex];
336 
337         if (LOG.isTraceEnabled()) {
338           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
339               + initialServer.getHostname() + " to " + newServer.getHostname());
340         }
341         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
342         plans.add(rp);
343       }
344     }
345     return plans;
346   }
347 
348   /**
349    * Store the current region loads.
350    */
351   private synchronized void updateRegionLoad() {
352     // We create a new hashmap so that regions that are no longer there are removed.
353     // However we temporarily need the old loads so we can use them to keep the rolling average.
354     Map<String, Deque<RegionLoad>> oldLoads = loads;
355     loads = new HashMap<String, Deque<RegionLoad>>();
356 
357     for (ServerName sn : clusterStatus.getServers()) {
358       ServerLoad sl = clusterStatus.getLoad(sn);
359       if (sl == null) {
360         continue;
361       }
362       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
363         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
364         if (rLoads == null) {
365           // There was nothing there
366           rLoads = new ArrayDeque<RegionLoad>();
367         } else if (rLoads.size() >= numRegionLoadsToRemember) {
368           rLoads.remove();
369         }
370         rLoads.add(entry.getValue());
371         loads.put(Bytes.toString(entry.getKey()), rLoads);
372 
373       }
374     }
375 
376     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
377       cost.setLoads(loads);
378     }
379   }
380 
381   protected void initCosts(Cluster cluster) {
382     for (CostFunction c:costFunctions) {
383       c.init(cluster);
384     }
385   }
386 
387   protected void updateCostsWithAction(Cluster cluster, Action action) {
388     for (CostFunction c : costFunctions) {
389       c.postAction(action);
390     }
391   }
392 
393   /**
394    * This is the main cost function.  It will compute a cost associated with a proposed cluster
395    * state.  All different costs will be combined with their multipliers to produce a double cost.
396    *
397    * @param cluster The state of the cluster
398    * @param previousCost the previous cost. This is used as an early out.
399    * @return a double of a cost associated with the proposed cluster state.  This cost is an
400    *         aggregate of all individual cost functions.
401    */
402   protected double computeCost(Cluster cluster, double previousCost) {
403     double total = 0;
404 
405     for (CostFunction c:costFunctions) {
406       if (c.getMultiplier() <= 0) {
407         continue;
408       }
409 
410       total += c.getMultiplier() * c.cost();
411 
412       if (total > previousCost) {
413         return total;
414       }
415     }
416     return total;
417   }
418 
419   /** Generates a candidate action to be applied to the cluster for cost function search */
420   abstract static class CandidateGenerator {
421     abstract Cluster.Action generate(Cluster cluster);
422 
423     /**
424      * From a list of regions pick a random one. Null can be returned which
425      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
426      * rather than swap.
427      *
428      * @param cluster        The state of the cluster
429      * @param server         index of the server
430      * @param chanceOfNoSwap Chance that this will decide to try a move rather
431      *                       than a swap.
432      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
433      *         suggested.
434      */
435     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
436       // Check to see if this is just a move.
437       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
438         // signal a move only.
439         return -1;
440       }
441       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
442       return cluster.regionsPerServer[server][rand];
443 
444     }
445     protected int pickRandomServer(Cluster cluster) {
446       if (cluster.numServers < 1) {
447         return -1;
448       }
449 
450       return RANDOM.nextInt(cluster.numServers);
451     }
452 
453     protected int pickRandomRack(Cluster cluster) {
454       if (cluster.numRacks < 1) {
455         return -1;
456       }
457 
458       return RANDOM.nextInt(cluster.numRacks);
459     }
460 
461     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
462       if (cluster.numServers < 2) {
463         return -1;
464       }
465       while (true) {
466         int otherServerIndex = pickRandomServer(cluster);
467         if (otherServerIndex != serverIndex) {
468           return otherServerIndex;
469         }
470       }
471     }
472 
473     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
474       if (cluster.numRacks < 2) {
475         return -1;
476       }
477       while (true) {
478         int otherRackIndex = pickRandomRack(cluster);
479         if (otherRackIndex != rackIndex) {
480           return otherRackIndex;
481         }
482       }
483     }
484 
485     protected Cluster.Action pickRandomRegions(Cluster cluster,
486                                                        int thisServer,
487                                                        int otherServer) {
488       if (thisServer < 0 || otherServer < 0) {
489         return Cluster.NullAction;
490       }
491 
492       // Decide who is most likely to need another region
493       int thisRegionCount = cluster.getNumRegions(thisServer);
494       int otherRegionCount = cluster.getNumRegions(otherServer);
495 
496       // Assign the chance based upon the above
497       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
498       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
499 
500       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
501       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
502 
503       return getAction(thisServer, thisRegion, otherServer, otherRegion);
504     }
505 
506     protected Cluster.Action getAction (int fromServer, int fromRegion,
507         int toServer, int toRegion) {
508       if (fromServer < 0 || toServer < 0) {
509         return Cluster.NullAction;
510       }
511       if (fromRegion > 0 && toRegion > 0) {
512         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
513           toServer, toRegion);
514       } else if (fromRegion > 0) {
515         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
516       } else if (toRegion > 0) {
517         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
518       } else {
519         return Cluster.NullAction;
520       }
521     }
522   }
523 
524   static class RandomCandidateGenerator extends CandidateGenerator {
525 
526     @Override
527     Cluster.Action generate(Cluster cluster) {
528 
529       int thisServer = pickRandomServer(cluster);
530 
531       // Pick the other server
532       int otherServer = pickOtherRandomServer(cluster, thisServer);
533 
534       return pickRandomRegions(cluster, thisServer, otherServer);
535     }
536   }
537 
538   static class LoadCandidateGenerator extends CandidateGenerator {
539 
540     @Override
541     Cluster.Action generate(Cluster cluster) {
542       cluster.sortServersByRegionCount();
543       int thisServer = pickMostLoadedServer(cluster, -1);
544       int otherServer = pickLeastLoadedServer(cluster, thisServer);
545 
546       return pickRandomRegions(cluster, thisServer, otherServer);
547     }
548 
549     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
550       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
551 
552       int index = 0;
553       while (servers[index] == null || servers[index] == thisServer) {
554         index++;
555         if (index == servers.length) {
556           return -1;
557         }
558       }
559       return servers[index];
560     }
561 
562     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
563       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
564 
565       int index = servers.length - 1;
566       while (servers[index] == null || servers[index] == thisServer) {
567         index--;
568         if (index < 0) {
569           return -1;
570         }
571       }
572       return servers[index];
573     }
574   }
575 
576   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
577 
578     private MasterServices masterServices;
579 
580     LocalityBasedCandidateGenerator(MasterServices masterServices) {
581       this.masterServices = masterServices;
582     }
583 
584     @Override
585     Cluster.Action generate(Cluster cluster) {
586       if (this.masterServices == null) {
587         return Cluster.NullAction;
588       }
589       // Pick a random region server
590       int thisServer = pickRandomServer(cluster);
591 
592       // Pick a random region on this server
593       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
594 
595       if (thisRegion == -1) {
596         return Cluster.NullAction;
597       }
598 
599       // Pick the server with the highest locality
600       int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
601 
602       if (otherServer == -1) {
603         return Cluster.NullAction;
604       }
605 
606       // pick an region on the other server to potentially swap
607       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
608 
609       return getAction(thisServer, thisRegion, otherServer, otherRegion);
610     }
611 
612     private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
613       int[] regionLocations = cluster.regionLocations[thisRegion];
614 
615       if (regionLocations == null || regionLocations.length <= 1) {
616         return pickOtherRandomServer(cluster, thisServer);
617       }
618 
619       for (int loc : regionLocations) {
620         if (loc >= 0 && loc != thisServer) { // find the first suitable server
621           return loc;
622         }
623       }
624 
625       // no location found
626       return pickOtherRandomServer(cluster, thisServer);
627     }
628 
629     void setServices(MasterServices services) {
630       this.masterServices = services;
631     }
632   }
633 
634   /**
635    * Generates candidates which moves the replicas out of the region server for
636    * co-hosted region replicas
637    */
638   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
639 
640     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
641 
642     /**
643      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
644      * (a group is a server, host or rack)
645      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
646      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
647      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
648      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
649      * @return a regionIndex for the selected primary or -1 if there is no co-locating
650      */
651     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
652         , int[] regionIndexToPrimaryIndex) {
653       int currentPrimary = -1;
654       int currentPrimaryIndex = -1;
655       int selectedPrimaryIndex = -1;
656       double currentLargestRandom = -1;
657       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
658       // ids for the regions hosted in server, a consecutive repetition means that replicas
659       // are co-hosted
660       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
661         int primary = j < primariesOfRegionsPerGroup.length
662             ? primariesOfRegionsPerGroup[j] : -1;
663         if (primary != currentPrimary) { // check for whether we see a new primary
664           int numReplicas = j - currentPrimaryIndex;
665           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
666             // decide to select this primary region id or not
667             double currentRandom = RANDOM.nextDouble();
668             // we don't know how many region replicas are co-hosted, we will randomly select one
669             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
670             if (currentRandom > currentLargestRandom) {
671               selectedPrimaryIndex = currentPrimary;
672               currentLargestRandom = currentRandom;
673             }
674           }
675           currentPrimary = primary;
676           currentPrimaryIndex = j;
677         }
678       }
679 
680       // we have found the primary id for the region to move. Now find the actual regionIndex
681       // with the given primary, prefer to move the secondary region.
682       for (int j = 0; j < regionsPerGroup.length; j++) {
683         int regionIndex = regionsPerGroup[j];
684         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
685           // always move the secondary, not the primary
686           if (selectedPrimaryIndex != regionIndex) {
687             return regionIndex;
688           }
689         }
690       }
691       return -1;
692     }
693 
694     @Override
695     Cluster.Action generate(Cluster cluster) {
696       int serverIndex = pickRandomServer(cluster);
697       if (cluster.numServers <= 1 || serverIndex == -1) {
698         return Cluster.NullAction;
699       }
700 
701       int regionIndex = selectCoHostedRegionPerGroup(
702         cluster.primariesOfRegionsPerServer[serverIndex],
703         cluster.regionsPerServer[serverIndex],
704         cluster.regionIndexToPrimaryIndex);
705 
706       // if there are no pairs of region replicas co-hosted, default to random generator
707       if (regionIndex == -1) {
708         // default to randompicker
709         return randomGenerator.generate(cluster);
710       }
711 
712       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
713       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
714       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
715     }
716   }
717 
718   /**
719    * Generates candidates which moves the replicas out of the rack for
720    * co-hosted region replicas in the same rack
721    */
722   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
723     @Override
724     Cluster.Action generate(Cluster cluster) {
725       int rackIndex = pickRandomRack(cluster);
726       if (cluster.numRacks <= 1 || rackIndex == -1) {
727         return super.generate(cluster);
728       }
729 
730       int regionIndex = selectCoHostedRegionPerGroup(
731         cluster.primariesOfRegionsPerRack[rackIndex],
732         cluster.regionsPerRack[rackIndex],
733         cluster.regionIndexToPrimaryIndex);
734 
735       // if there are no pairs of region replicas co-hosted, default to random generator
736       if (regionIndex == -1) {
737         // default to randompicker
738         return randomGenerator.generate(cluster);
739       }
740 
741       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
742       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
743 
744       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
745       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
746       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
747       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
748     }
749   }
750 
751   /**
752    * Base class of StochasticLoadBalancer's Cost Functions.
753    */
754   abstract static class CostFunction {
755 
756     private float multiplier = 0;
757 
758     protected Cluster cluster;
759 
760     CostFunction(Configuration c) {
761 
762     }
763 
764     float getMultiplier() {
765       return multiplier;
766     }
767 
768     void setMultiplier(float m) {
769       this.multiplier = m;
770     }
771 
772     /** Called once per LB invocation to give the cost function
773      * to initialize it's state, and perform any costly calculation.
774      */
775     void init(Cluster cluster) {
776       this.cluster = cluster;
777     }
778 
779     /** Called once per cluster Action to give the cost function
780      * an opportunity to update it's state. postAction() is always
781      * called at least once before cost() is called with the cluster
782      * that this action is performed on. */
783     void postAction(Action action) {
784       switch (action.type) {
785       case NULL: break;
786       case ASSIGN_REGION:
787         AssignRegionAction ar = (AssignRegionAction) action;
788         regionMoved(ar.region, -1, ar.server);
789         break;
790       case MOVE_REGION:
791         MoveRegionAction mra = (MoveRegionAction) action;
792         regionMoved(mra.region, mra.fromServer, mra.toServer);
793         break;
794       case SWAP_REGIONS:
795         SwapRegionsAction a = (SwapRegionsAction) action;
796         regionMoved(a.fromRegion, a.fromServer, a.toServer);
797         regionMoved(a.toRegion, a.toServer, a.fromServer);
798         break;
799       default:
800         throw new RuntimeException("Uknown action:" + action.type);
801       }
802     }
803 
804     protected void regionMoved(int region, int oldServer, int newServer) {
805     }
806 
807     abstract double cost();
808 
809     /**
810      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
811      * assumes that this is a zero sum set of costs.  It assumes that the worst case
812      * possible is all of the elements in one region server and the rest having 0.
813      *
814      * @param stats the costs
815      * @return a scaled set of costs.
816      */
817     protected double costFromArray(double[] stats) {
818       double totalCost = 0;
819       double total = getSum(stats);
820 
821       double count = stats.length;
822       double mean = total/count;
823 
824       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
825       // a zero sum cost for this to make sense.
826       double max = ((count - 1) * mean) + (total - mean);
827 
828       // It's possible that there aren't enough regions to go around
829       double min;
830       if (count > total) {
831         min = ((count - total) * mean) + ((1 - mean) * total);
832       } else {
833         // Some will have 1 more than everything else.
834         int numHigh = (int) (total - (Math.floor(mean) * count));
835         int numLow = (int) (count - numHigh);
836 
837         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
838 
839       }
840       min = Math.max(0, min);
841       for (int i=0; i<stats.length; i++) {
842         double n = stats[i];
843         double diff = Math.abs(mean - n);
844         totalCost += diff;
845       }
846 
847       double scaled =  scale(min, max, totalCost);
848       return scaled;
849     }
850 
851     private double getSum(double[] stats) {
852       double total = 0;
853       for(double s:stats) {
854         total += s;
855       }
856       return total;
857     }
858 
859     /**
860      * Scale the value between 0 and 1.
861      *
862      * @param min   Min value
863      * @param max   The Max value
864      * @param value The value to be scaled.
865      * @return The scaled value.
866      */
867     protected double scale(double min, double max, double value) {
868       if (max <= min || value <= min) {
869         return 0;
870       }
871       if ((max - min) == 0) return 0;
872 
873       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
874     }
875   }
876 
877   /**
878    * Given the starting state of the regions and a potential ending state
879    * compute cost based upon the number of regions that have moved.
880    */
881   static class MoveCostFunction extends CostFunction {
882     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
883     private static final String MAX_MOVES_PERCENT_KEY =
884         "hbase.master.balancer.stochastic.maxMovePercent";
885     private static final float DEFAULT_MOVE_COST = 100;
886     private static final int DEFAULT_MAX_MOVES = 600;
887     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
888 
889     private final float maxMovesPercent;
890 
891     MoveCostFunction(Configuration conf) {
892       super(conf);
893 
894       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
895       // that large benefits are need to overcome the cost of a move.
896       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
897       // What percent of the number of regions a single run of the balancer can move.
898       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
899     }
900 
901     @Override
902     double cost() {
903       // Try and size the max number of Moves, but always be prepared to move some.
904       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
905           DEFAULT_MAX_MOVES);
906 
907       double moveCost = cluster.numMovedRegions;
908 
909       // Don't let this single balance move more than the max moves.
910       // This allows better scaling to accurately represent the actual cost of a move.
911       if (moveCost > maxMoves) {
912         return 1000000;   // return a number much greater than any of the other cost
913       }
914 
915       return scale(0, cluster.numRegions, moveCost);
916     }
917   }
918 
919   /**
920    * Compute the cost of a potential cluster state from skew in number of
921    * regions on a cluster.
922    */
923   static class RegionCountSkewCostFunction extends CostFunction {
924     private static final String REGION_COUNT_SKEW_COST_KEY =
925         "hbase.master.balancer.stochastic.regionCountCost";
926     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
927 
928     private double[] stats = null;
929 
930     RegionCountSkewCostFunction(Configuration conf) {
931       super(conf);
932       // Load multiplier should be the greatest as it is the most general way to balance data.
933       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
934     }
935 
936     @Override
937     double cost() {
938       if (stats == null || stats.length != cluster.numServers) {
939         stats = new double[cluster.numServers];
940       }
941 
942       for (int i =0; i < cluster.numServers; i++) {
943         stats[i] = cluster.regionsPerServer[i].length;
944       }
945 
946       return costFromArray(stats);
947     }
948   }
949 
950   /**
951    * Compute the cost of a potential cluster state from skew in number of
952    * primary regions on a cluster.
953    */
954   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
955     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
956         "hbase.master.balancer.stochastic.primaryRegionCountCost";
957     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
958 
959     private double[] stats = null;
960 
961     PrimaryRegionCountSkewCostFunction(Configuration conf) {
962       super(conf);
963       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
964       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
965         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
966     }
967 
968     @Override
969     double cost() {
970       if (!cluster.hasRegionReplicas) {
971         return 0;
972       }
973       if (stats == null || stats.length != cluster.numServers) {
974         stats = new double[cluster.numServers];
975       }
976 
977       for (int i =0; i < cluster.numServers; i++) {
978         stats[i] = 0;
979         for (int regionIdx : cluster.regionsPerServer[i]) {
980           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
981             stats[i] ++;
982           }
983         }
984       }
985 
986       return costFromArray(stats);
987     }
988   }
989 
990   /**
991    * Compute the cost of a potential cluster configuration based upon how evenly
992    * distributed tables are.
993    */
994   static class TableSkewCostFunction extends CostFunction {
995 
996     private static final String TABLE_SKEW_COST_KEY =
997         "hbase.master.balancer.stochastic.tableSkewCost";
998     private static final float DEFAULT_TABLE_SKEW_COST = 35;
999 
1000     TableSkewCostFunction(Configuration conf) {
1001       super(conf);
1002       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1003     }
1004 
1005     @Override
1006     double cost() {
1007       double max = cluster.numRegions;
1008       double min = ((double) cluster.numRegions) / cluster.numServers;
1009       double value = 0;
1010 
1011       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1012         value += cluster.numMaxRegionsPerTable[i];
1013       }
1014 
1015       return scale(min, max, value);
1016     }
1017   }
1018 
1019   /**
1020    * Compute a cost of a potential cluster configuration based upon where
1021    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1022    */
1023   static class LocalityCostFunction extends CostFunction {
1024 
1025     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1026     private static final float DEFAULT_LOCALITY_COST = 25;
1027 
1028     private MasterServices services;
1029 
1030     LocalityCostFunction(Configuration conf, MasterServices srv) {
1031       super(conf);
1032       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1033       this.services = srv;
1034     }
1035 
1036     void setServices(MasterServices srvc) {
1037       this.services = srvc;
1038     }
1039 
1040     @Override
1041     double cost() {
1042       double max = 0;
1043       double cost = 0;
1044 
1045       // If there's no master so there's no way anything else works.
1046       if (this.services == null) {
1047         return cost;
1048       }
1049 
1050       for (int i = 0; i < cluster.regionLocations.length; i++) {
1051         max += 1;
1052         int serverIndex = cluster.regionIndexToServerIndex[i];
1053         int[] regionLocations = cluster.regionLocations[i];
1054 
1055         // If we can't find where the data is getTopBlock returns null.
1056         // so count that as being the best possible.
1057         if (regionLocations == null) {
1058           continue;
1059         }
1060 
1061         int index = -1;
1062         for (int j = 0; j < regionLocations.length; j++) {
1063           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1064             index = j;
1065             break;
1066           }
1067         }
1068 
1069         if (index < 0) {
1070           if (regionLocations.length > 0) {
1071             cost += 1;
1072           }
1073         } else {
1074           cost += (double) index / (double) regionLocations.length;
1075         }
1076       }
1077       return scale(0, max, cost);
1078     }
1079   }
1080 
1081   /**
1082    * Base class the allows writing costs functions from rolling average of some
1083    * number from RegionLoad.
1084    */
1085   abstract static class CostFromRegionLoadFunction extends CostFunction {
1086 
1087     private ClusterStatus clusterStatus = null;
1088     private Map<String, Deque<RegionLoad>> loads = null;
1089     private double[] stats = null;
1090     CostFromRegionLoadFunction(Configuration conf) {
1091       super(conf);
1092     }
1093 
1094     void setClusterStatus(ClusterStatus status) {
1095       this.clusterStatus = status;
1096     }
1097 
1098     void setLoads(Map<String, Deque<RegionLoad>> l) {
1099       this.loads = l;
1100     }
1101 
1102     @Override
1103     double cost() {
1104       if (clusterStatus == null || loads == null) {
1105         return 0;
1106       }
1107 
1108       if (stats == null || stats.length != cluster.numServers) {
1109         stats = new double[cluster.numServers];
1110       }
1111 
1112       for (int i =0; i < stats.length; i++) {
1113         //Cost this server has from RegionLoad
1114         long cost = 0;
1115 
1116         // for every region on this server get the rl
1117         for(int regionIndex:cluster.regionsPerServer[i]) {
1118           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1119 
1120           // Now if we found a region load get the type of cost that was requested.
1121           if (regionLoadList != null) {
1122             cost += getRegionLoadCost(regionLoadList);
1123           }
1124         }
1125 
1126         // Add the total cost to the stats.
1127         stats[i] = cost;
1128       }
1129 
1130       // Now return the scaled cost from data held in the stats object.
1131       return costFromArray(stats);
1132     }
1133 
1134     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1135       double cost = 0;
1136 
1137       for (RegionLoad rl : regionLoadList) {
1138         double toAdd = getCostFromRl(rl);
1139 
1140         if (cost == 0) {
1141           cost = toAdd;
1142         } else {
1143           cost = (.5 * cost) + (.5 * toAdd);
1144         }
1145       }
1146 
1147       return cost;
1148     }
1149 
1150     protected abstract double getCostFromRl(RegionLoad rl);
1151   }
1152 
1153   /**
1154    * Compute the cost of total number of read requests  The more unbalanced the higher the
1155    * computed cost will be.  This uses a rolling average of regionload.
1156    */
1157 
1158   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1159 
1160     private static final String READ_REQUEST_COST_KEY =
1161         "hbase.master.balancer.stochastic.readRequestCost";
1162     private static final float DEFAULT_READ_REQUEST_COST = 5;
1163 
1164     ReadRequestCostFunction(Configuration conf) {
1165       super(conf);
1166       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1167     }
1168 
1169 
1170     @Override
1171     protected double getCostFromRl(RegionLoad rl) {
1172       return rl.getReadRequestsCount();
1173     }
1174   }
1175 
1176   /**
1177    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1178    * computed cost will be.  This uses a rolling average of regionload.
1179    */
1180   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1181 
1182     private static final String WRITE_REQUEST_COST_KEY =
1183         "hbase.master.balancer.stochastic.writeRequestCost";
1184     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1185 
1186     WriteRequestCostFunction(Configuration conf) {
1187       super(conf);
1188       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1189     }
1190 
1191     @Override
1192     protected double getCostFromRl(RegionLoad rl) {
1193       return rl.getWriteRequestsCount();
1194     }
1195   }
1196 
1197   /**
1198    * A cost function for region replicas. We give a very high cost to hosting
1199    * replicas of the same region in the same host. We do not prevent the case
1200    * though, since if numReplicas > numRegionServers, we still want to keep the
1201    * replica open.
1202    */
1203   static class RegionReplicaHostCostFunction extends CostFunction {
1204     private static final String REGION_REPLICA_HOST_COST_KEY =
1205         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1206     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1207 
1208     long maxCost = 0;
1209     long[] costsPerGroup; // group is either server, host or rack
1210     int[][] primariesOfRegionsPerGroup;
1211 
1212     public RegionReplicaHostCostFunction(Configuration conf) {
1213       super(conf);
1214       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1215         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1216     }
1217 
1218     @Override
1219     void init(Cluster cluster) {
1220       super.init(cluster);
1221       // max cost is the case where every region replica is hosted together regardless of host
1222       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1223       costsPerGroup = new long[cluster.numHosts];
1224       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1225           ? cluster.primariesOfRegionsPerHost
1226           : cluster.primariesOfRegionsPerServer;
1227       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1228         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1229       }
1230     }
1231 
1232     long getMaxCost(Cluster cluster) {
1233       if (!cluster.hasRegionReplicas) {
1234         return 0; // short circuit
1235       }
1236       // max cost is the case where every region replica is hosted together regardless of host
1237       int[] primariesOfRegions = new int[cluster.numRegions];
1238       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1239           cluster.regions.length);
1240 
1241       Arrays.sort(primariesOfRegions);
1242 
1243       // compute numReplicas from the sorted array
1244       return costPerGroup(primariesOfRegions);
1245     }
1246 
1247     @Override
1248     double cost() {
1249       if (maxCost <= 0) {
1250         return 0;
1251       }
1252 
1253       long totalCost = 0;
1254       for (int i = 0 ; i < costsPerGroup.length; i++) {
1255         totalCost += costsPerGroup[i];
1256       }
1257       return scale(0, maxCost, totalCost);
1258     }
1259 
1260     /**
1261      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1262      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1263      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1264      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1265      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1266      * @return a sum of numReplicas-1 squared for each primary region in the group.
1267      */
1268     protected long costPerGroup(int[] primariesOfRegions) {
1269       long cost = 0;
1270       int currentPrimary = -1;
1271       int currentPrimaryIndex = -1;
1272       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1273       // sharing the same primary will have consecutive numbers in the array.
1274       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1275         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1276         if (primary != currentPrimary) { // we see a new primary
1277           int numReplicas = j - currentPrimaryIndex;
1278           // square the cost
1279           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1280             cost += (numReplicas - 1) * (numReplicas - 1);
1281           }
1282           currentPrimary = primary;
1283           currentPrimaryIndex = j;
1284         }
1285       }
1286 
1287       return cost;
1288     }
1289 
1290     @Override
1291     protected void regionMoved(int region, int oldServer, int newServer) {
1292       if (maxCost <= 0) {
1293         return; // no need to compute
1294       }
1295       if (cluster.multiServersPerHost) {
1296         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1297         int newHost = cluster.serverIndexToHostIndex[newServer];
1298         if (newHost != oldHost) {
1299           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1300           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1301         }
1302       } else {
1303         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1304         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1305       }
1306     }
1307   }
1308 
1309   /**
1310    * A cost function for region replicas for the rack distribution. We give a relatively high
1311    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1312    * though.
1313    */
1314   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1315     private static final String REGION_REPLICA_RACK_COST_KEY =
1316         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1317     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1318 
1319     public RegionReplicaRackCostFunction(Configuration conf) {
1320       super(conf);
1321       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1322     }
1323 
1324     @Override
1325     void init(Cluster cluster) {
1326       this.cluster = cluster;
1327       if (cluster.numRacks <= 1) {
1328         maxCost = 0;
1329         return; // disabled for 1 rack
1330       }
1331       // max cost is the case where every region replica is hosted together regardless of rack
1332       maxCost = getMaxCost(cluster);
1333       costsPerGroup = new long[cluster.numRacks];
1334       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1335         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1336       }
1337     }
1338 
1339     @Override
1340     protected void regionMoved(int region, int oldServer, int newServer) {
1341       if (maxCost <= 0) {
1342         return; // no need to compute
1343       }
1344       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1345       int newRack = cluster.serverIndexToRackIndex[newServer];
1346       if (newRack != oldRack) {
1347         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1348         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1349       }
1350     }
1351   }
1352 
1353   /**
1354    * Compute the cost of total memstore size.  The more unbalanced the higher the
1355    * computed cost will be.  This uses a rolling average of regionload.
1356    */
1357   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1358 
1359     private static final String MEMSTORE_SIZE_COST_KEY =
1360         "hbase.master.balancer.stochastic.memstoreSizeCost";
1361     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1362 
1363     MemstoreSizeCostFunction(Configuration conf) {
1364       super(conf);
1365       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1366     }
1367 
1368     @Override
1369     protected double getCostFromRl(RegionLoad rl) {
1370       return rl.getMemStoreSizeMB();
1371     }
1372   }
1373   /**
1374    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1375    * computed cost will be.  This uses a rolling average of regionload.
1376    */
1377   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1378 
1379     private static final String STOREFILE_SIZE_COST_KEY =
1380         "hbase.master.balancer.stochastic.storefileSizeCost";
1381     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1382 
1383     StoreFileCostFunction(Configuration conf) {
1384       super(conf);
1385       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1386     }
1387 
1388     @Override
1389     protected double getCostFromRl(RegionLoad rl) {
1390       return rl.getStorefileSizeMB();
1391     }
1392   }
1393 }