View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Random;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.RegionLoad;
39  import org.apache.hadoop.hbase.ServerLoad;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.master.MasterServices;
42  import org.apache.hadoop.hbase.master.RegionPlan;
43  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
44  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  /**
52   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
53   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
54   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
55   * <ul>
56   * <li>Region Load</li>
57   * <li>Table Load</li>
58   * <li>Data Locality</li>
59   * <li>Memstore Sizes</li>
60   * <li>Storefile Sizes</li>
61   * </ul>
62   *
63   *
64   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
65   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
66   * scaled by their respective multipliers:</p>
67   *
68   * <ul>
69   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
70   *   <li>hbase.master.balancer.stochastic.moveCost</li>
71   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
72   *   <li>hbase.master.balancer.stochastic.localityCost</li>
73   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
74   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
75   * </ul>
76   *
77   * <p>In addition to the above configurations, the balancer can be tuned by the following
78   * configuration values:</p>
79   * <ul>
80   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
81   *   controls what the max number of regions that can be moved in a single invocation of this
82   *   balancer.</li>
83   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
84   *   regions is multiplied to try and get the number of times the balancer will
85   *   mutate all servers.</li>
86   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
87   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
88   *   value and the above computation.</li>
89   * </ul>
90   *
91   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
92   * so that the balancer gets the full picture of all loads on the cluster.</p>
93   */
94  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
95  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
96    justification="Complaint is about costFunctions not being synchronized; not end of the world")
97  public class StochasticLoadBalancer extends BaseLoadBalancer {
98  
99    protected static final String STEPS_PER_REGION_KEY =
100       "hbase.master.balancer.stochastic.stepsPerRegion";
101   protected static final String MAX_STEPS_KEY =
102       "hbase.master.balancer.stochastic.maxSteps";
103   protected static final String MAX_RUNNING_TIME_KEY =
104       "hbase.master.balancer.stochastic.maxRunningTime";
105   protected static final String KEEP_REGION_LOADS =
106       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
107 
108   private static final Random RANDOM = new Random(System.currentTimeMillis());
109   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
110 
111   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
112 
113   // values are defaults
114   private int maxSteps = 1000000;
115   private int stepsPerRegion = 800;
116   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
117   private int numRegionLoadsToRemember = 15;
118 
119   private CandidateGenerator[] candidateGenerators;
120   private CostFromRegionLoadFunction[] regionLoadFunctions;
121 
122   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
123 
124   // Keep locality based picker and cost function to alert them
125   // when new services are offered
126   private LocalityBasedCandidateGenerator localityCandidateGenerator;
127   private LocalityCostFunction localityCost;
128   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
129   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
130 
131   @Override
132   public void onConfigurationChange(Configuration conf) {
133     setConf(conf);
134   }
135 
136   @Override
137   public synchronized void setConf(Configuration conf) {
138     super.setConf(conf);
139     LOG.info("loading config");
140 
141     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
142 
143     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
144     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
145 
146     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
147 
148     if (localityCandidateGenerator == null) {
149       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
150     }
151     localityCost = new LocalityCostFunction(conf, services);
152 
153     if (candidateGenerators == null) {
154       candidateGenerators = new CandidateGenerator[] {
155           new RandomCandidateGenerator(),
156           new LoadCandidateGenerator(),
157           localityCandidateGenerator,
158           new RegionReplicaRackCandidateGenerator(),
159       };
160     }
161 
162     regionLoadFunctions = new CostFromRegionLoadFunction[] {
163       new ReadRequestCostFunction(conf),
164       new WriteRequestCostFunction(conf),
165       new MemstoreSizeCostFunction(conf),
166       new StoreFileCostFunction(conf)
167     };
168 
169     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
170     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
171 
172     costFunctions = new CostFunction[]{
173       new RegionCountSkewCostFunction(conf),
174       new PrimaryRegionCountSkewCostFunction(conf),
175       new MoveCostFunction(conf),
176       localityCost,
177       new TableSkewCostFunction(conf),
178       regionReplicaHostCostFunction,
179       regionReplicaRackCostFunction,
180       regionLoadFunctions[0],
181       regionLoadFunctions[1],
182       regionLoadFunctions[2],
183       regionLoadFunctions[3],
184     };
185   }
186 
187   @Override
188   protected void setSlop(Configuration conf) {
189     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
190   }
191 
192   @Override
193   public synchronized void setClusterStatus(ClusterStatus st) {
194     super.setClusterStatus(st);
195     updateRegionLoad();
196     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
197       cost.setClusterStatus(st);
198     }
199   }
200 
201   @Override
202   public synchronized void setMasterServices(MasterServices masterServices) {
203     super.setMasterServices(masterServices);
204     this.localityCost.setServices(masterServices);
205     this.localityCandidateGenerator.setServices(masterServices);
206 
207   }
208 
209   @Override
210   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
211     regionReplicaHostCostFunction.init(c);
212     if (regionReplicaHostCostFunction.cost() > 0) return true;
213     regionReplicaRackCostFunction.init(c);
214     if (regionReplicaRackCostFunction.cost() > 0) return true;
215     return false;
216   }
217 
218   /**
219    * Given the cluster state this will try and approach an optimal balance. This
220    * should always approach the optimal state given enough steps.
221    */
222   @Override
223   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
224     List<HRegionInfo>> clusterState) {
225     List<RegionPlan> plans = balanceMasterRegions(clusterState);
226     if (plans != null || clusterState == null || clusterState.size() <= 1) {
227       return plans;
228     }
229     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
230       if (clusterState.size() <= 2) {
231         return null;
232       }
233       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
234       clusterState.remove(masterServerName);
235     }
236 
237     // On clusters with lots of HFileLinks or lots of reference files,
238     // instantiating the storefile infos can be quite expensive.
239     // Allow turning this feature off if the locality cost is not going to
240     // be used in any computations.
241     RegionLocationFinder finder = null;
242     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
243       finder = this.regionFinder;
244     }
245 
246     //The clusterState that is given to this method contains the state
247     //of all the regions in the table(s) (that's true today)
248     // Keep track of servers to iterate through them.
249     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
250     if (!needsBalance(cluster)) {
251       return null;
252     }
253 
254     long startTime = EnvironmentEdgeManager.currentTime();
255 
256     initCosts(cluster);
257 
258     double currentCost = computeCost(cluster, Double.MAX_VALUE);
259 
260     double initCost = currentCost;
261     double newCost = currentCost;
262 
263     long computedMaxSteps = Math.min(this.maxSteps,
264         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
265     // Perform a stochastic walk to see if we can get a good fit.
266     long step;
267 
268     for (step = 0; step < computedMaxSteps; step++) {
269       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
270       CandidateGenerator p = candidateGenerators[generatorIdx];
271       Cluster.Action action = p.generate(cluster);
272 
273       if (action.type == Type.NULL) {
274         continue;
275       }
276 
277       cluster.doAction(action);
278       updateCostsWithAction(cluster, action);
279 
280       newCost = computeCost(cluster, currentCost);
281 
282       // Should this be kept?
283       if (newCost < currentCost) {
284         currentCost = newCost;
285       } else {
286         // Put things back the way they were before.
287         // TODO: undo by remembering old values
288         Action undoAction = action.undoAction();
289         cluster.doAction(undoAction);
290         updateCostsWithAction(cluster, undoAction);
291       }
292 
293       if (EnvironmentEdgeManager.currentTime() - startTime >
294           maxRunningTime) {
295         break;
296       }
297     }
298     long endTime = EnvironmentEdgeManager.currentTime();
299 
300     metricsBalancer.balanceCluster(endTime - startTime);
301 
302     if (initCost > currentCost) {
303       plans = createRegionPlans(cluster);
304       if (LOG.isDebugEnabled()) {
305         LOG.debug("Finished computing new load balance plan.  Computation took "
306             + (endTime - startTime) + "ms to try " + step
307             + " different iterations.  Found a solution that moves "
308             + plans.size() + " regions; Going from a computed cost of "
309             + initCost + " to a new cost of " + currentCost);
310       }
311       return plans;
312     }
313     if (LOG.isDebugEnabled()) {
314       LOG.debug("Could not find a better load balance plan.  Tried "
315           + step + " different configurations in " + (endTime - startTime)
316           + "ms, and did not find anything with a computed cost less than " + initCost);
317     }
318     return null;
319   }
320 
321   /**
322    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
323    * state.
324    *
325    * @param cluster The state of the cluster
326    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
327    */
328   private List<RegionPlan> createRegionPlans(Cluster cluster) {
329     List<RegionPlan> plans = new LinkedList<RegionPlan>();
330     for (int regionIndex = 0;
331          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
332       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
333       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
334 
335       if (initialServerIndex != newServerIndex) {
336         HRegionInfo region = cluster.regions[regionIndex];
337         ServerName initialServer = cluster.servers[initialServerIndex];
338         ServerName newServer = cluster.servers[newServerIndex];
339 
340         if (LOG.isTraceEnabled()) {
341           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
342               + initialServer.getHostname() + " to " + newServer.getHostname());
343         }
344         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
345         plans.add(rp);
346       }
347     }
348     return plans;
349   }
350 
351   /**
352    * Store the current region loads.
353    */
354   private synchronized void updateRegionLoad() {
355     // We create a new hashmap so that regions that are no longer there are removed.
356     // However we temporarily need the old loads so we can use them to keep the rolling average.
357     Map<String, Deque<RegionLoad>> oldLoads = loads;
358     loads = new HashMap<String, Deque<RegionLoad>>();
359 
360     for (ServerName sn : clusterStatus.getServers()) {
361       ServerLoad sl = clusterStatus.getLoad(sn);
362       if (sl == null) {
363         continue;
364       }
365       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
366         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
367         if (rLoads == null) {
368           // There was nothing there
369           rLoads = new ArrayDeque<RegionLoad>();
370         } else if (rLoads.size() >= numRegionLoadsToRemember) {
371           rLoads.remove();
372         }
373         rLoads.add(entry.getValue());
374         loads.put(Bytes.toString(entry.getKey()), rLoads);
375 
376       }
377     }
378 
379     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
380       cost.setLoads(loads);
381     }
382   }
383 
384   protected void initCosts(Cluster cluster) {
385     for (CostFunction c:costFunctions) {
386       c.init(cluster);
387     }
388   }
389 
390   protected void updateCostsWithAction(Cluster cluster, Action action) {
391     for (CostFunction c : costFunctions) {
392       c.postAction(action);
393     }
394   }
395 
396   /**
397    * This is the main cost function.  It will compute a cost associated with a proposed cluster
398    * state.  All different costs will be combined with their multipliers to produce a double cost.
399    *
400    * @param cluster The state of the cluster
401    * @param previousCost the previous cost. This is used as an early out.
402    * @return a double of a cost associated with the proposed cluster state.  This cost is an
403    *         aggregate of all individual cost functions.
404    */
405   protected double computeCost(Cluster cluster, double previousCost) {
406     double total = 0;
407 
408     for (CostFunction c:costFunctions) {
409       if (c.getMultiplier() <= 0) {
410         continue;
411       }
412 
413       total += c.getMultiplier() * c.cost();
414 
415       if (total > previousCost) {
416         return total;
417       }
418     }
419     return total;
420   }
421 
422   /** Generates a candidate action to be applied to the cluster for cost function search */
423   abstract static class CandidateGenerator {
424     abstract Cluster.Action generate(Cluster cluster);
425 
426     /**
427      * From a list of regions pick a random one. Null can be returned which
428      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
429      * rather than swap.
430      *
431      * @param cluster        The state of the cluster
432      * @param server         index of the server
433      * @param chanceOfNoSwap Chance that this will decide to try a move rather
434      *                       than a swap.
435      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
436      *         suggested.
437      */
438     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
439       // Check to see if this is just a move.
440       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
441         // signal a move only.
442         return -1;
443       }
444       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
445       return cluster.regionsPerServer[server][rand];
446 
447     }
448     protected int pickRandomServer(Cluster cluster) {
449       if (cluster.numServers < 1) {
450         return -1;
451       }
452 
453       return RANDOM.nextInt(cluster.numServers);
454     }
455 
456     protected int pickRandomRack(Cluster cluster) {
457       if (cluster.numRacks < 1) {
458         return -1;
459       }
460 
461       return RANDOM.nextInt(cluster.numRacks);
462     }
463 
464     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
465       if (cluster.numServers < 2) {
466         return -1;
467       }
468       while (true) {
469         int otherServerIndex = pickRandomServer(cluster);
470         if (otherServerIndex != serverIndex) {
471           return otherServerIndex;
472         }
473       }
474     }
475 
476     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
477       if (cluster.numRacks < 2) {
478         return -1;
479       }
480       while (true) {
481         int otherRackIndex = pickRandomRack(cluster);
482         if (otherRackIndex != rackIndex) {
483           return otherRackIndex;
484         }
485       }
486     }
487 
488     protected Cluster.Action pickRandomRegions(Cluster cluster,
489                                                        int thisServer,
490                                                        int otherServer) {
491       if (thisServer < 0 || otherServer < 0) {
492         return Cluster.NullAction;
493       }
494 
495       // Decide who is most likely to need another region
496       int thisRegionCount = cluster.getNumRegions(thisServer);
497       int otherRegionCount = cluster.getNumRegions(otherServer);
498 
499       // Assign the chance based upon the above
500       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
501       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
502 
503       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
504       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
505 
506       return getAction(thisServer, thisRegion, otherServer, otherRegion);
507     }
508 
509     protected Cluster.Action getAction (int fromServer, int fromRegion,
510         int toServer, int toRegion) {
511       if (fromServer < 0 || toServer < 0) {
512         return Cluster.NullAction;
513       }
514       if (fromRegion > 0 && toRegion > 0) {
515         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
516           toServer, toRegion);
517       } else if (fromRegion > 0) {
518         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
519       } else if (toRegion > 0) {
520         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
521       } else {
522         return Cluster.NullAction;
523       }
524     }
525   }
526 
527   static class RandomCandidateGenerator extends CandidateGenerator {
528 
529     @Override
530     Cluster.Action generate(Cluster cluster) {
531 
532       int thisServer = pickRandomServer(cluster);
533 
534       // Pick the other server
535       int otherServer = pickOtherRandomServer(cluster, thisServer);
536 
537       return pickRandomRegions(cluster, thisServer, otherServer);
538     }
539   }
540 
541   static class LoadCandidateGenerator extends CandidateGenerator {
542 
543     @Override
544     Cluster.Action generate(Cluster cluster) {
545       cluster.sortServersByRegionCount();
546       int thisServer = pickMostLoadedServer(cluster, -1);
547       int otherServer = pickLeastLoadedServer(cluster, thisServer);
548 
549       return pickRandomRegions(cluster, thisServer, otherServer);
550     }
551 
552     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
553       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
554 
555       int index = 0;
556       while (servers[index] == null || servers[index] == thisServer) {
557         index++;
558         if (index == servers.length) {
559           return -1;
560         }
561       }
562       return servers[index];
563     }
564 
565     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
566       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
567 
568       int index = servers.length - 1;
569       while (servers[index] == null || servers[index] == thisServer) {
570         index--;
571         if (index < 0) {
572           return -1;
573         }
574       }
575       return servers[index];
576     }
577   }
578 
579   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
580 
581     private MasterServices masterServices;
582 
583     LocalityBasedCandidateGenerator(MasterServices masterServices) {
584       this.masterServices = masterServices;
585     }
586 
587     @Override
588     Cluster.Action generate(Cluster cluster) {
589       if (this.masterServices == null) {
590         int thisServer = pickRandomServer(cluster);
591         // Pick the other server
592         int otherServer = pickOtherRandomServer(cluster, thisServer);
593         return pickRandomRegions(cluster, thisServer, otherServer);
594       }
595 
596       cluster.calculateRegionServerLocalities();
597       // Pick server with lowest locality
598       int thisServer = pickLowestLocalityServer(cluster);
599       int thisRegion;
600       if (thisServer == -1) {
601         LOG.warn("Could not pick lowest locality region server");
602         return Cluster.NullAction;
603       } else {
604       // Pick lowest locality region on this server
605         thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
606       }
607 
608       if (thisRegion == -1) {
609         return Cluster.NullAction;
610       }
611 
612       // Pick the least loaded server with good locality for the region
613       int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
614 
615       if (otherServer == -1) {
616         return Cluster.NullAction;
617       }
618 
619       // Let the candidate region be moved to its highest locality server.
620       int otherRegion = -1;
621 
622       return getAction(thisServer, thisRegion, otherServer, otherRegion);
623     }
624 
625     private int pickLowestLocalityServer(Cluster cluster) {
626       return cluster.getLowestLocalityRegionServer();
627     }
628 
629     private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
630       return cluster.getLowestLocalityRegionOnServer(server);
631     }
632 
633     void setServices(MasterServices services) {
634       this.masterServices = services;
635     }
636   }
637 
638   /**
639    * Generates candidates which moves the replicas out of the region server for
640    * co-hosted region replicas
641    */
642   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
643 
644     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
645 
646     /**
647      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
648      * (a group is a server, host or rack)
649      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
650      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
651      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
652      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
653      * @return a regionIndex for the selected primary or -1 if there is no co-locating
654      */
655     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
656         , int[] regionIndexToPrimaryIndex) {
657       int currentPrimary = -1;
658       int currentPrimaryIndex = -1;
659       int selectedPrimaryIndex = -1;
660       double currentLargestRandom = -1;
661       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
662       // ids for the regions hosted in server, a consecutive repetition means that replicas
663       // are co-hosted
664       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
665         int primary = j < primariesOfRegionsPerGroup.length
666             ? primariesOfRegionsPerGroup[j] : -1;
667         if (primary != currentPrimary) { // check for whether we see a new primary
668           int numReplicas = j - currentPrimaryIndex;
669           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
670             // decide to select this primary region id or not
671             double currentRandom = RANDOM.nextDouble();
672             // we don't know how many region replicas are co-hosted, we will randomly select one
673             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
674             if (currentRandom > currentLargestRandom) {
675               selectedPrimaryIndex = currentPrimary;
676               currentLargestRandom = currentRandom;
677             }
678           }
679           currentPrimary = primary;
680           currentPrimaryIndex = j;
681         }
682       }
683 
684       // we have found the primary id for the region to move. Now find the actual regionIndex
685       // with the given primary, prefer to move the secondary region.
686       for (int j = 0; j < regionsPerGroup.length; j++) {
687         int regionIndex = regionsPerGroup[j];
688         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
689           // always move the secondary, not the primary
690           if (selectedPrimaryIndex != regionIndex) {
691             return regionIndex;
692           }
693         }
694       }
695       return -1;
696     }
697 
698     @Override
699     Cluster.Action generate(Cluster cluster) {
700       int serverIndex = pickRandomServer(cluster);
701       if (cluster.numServers <= 1 || serverIndex == -1) {
702         return Cluster.NullAction;
703       }
704 
705       int regionIndex = selectCoHostedRegionPerGroup(
706         cluster.primariesOfRegionsPerServer[serverIndex],
707         cluster.regionsPerServer[serverIndex],
708         cluster.regionIndexToPrimaryIndex);
709 
710       // if there are no pairs of region replicas co-hosted, default to random generator
711       if (regionIndex == -1) {
712         // default to randompicker
713         return randomGenerator.generate(cluster);
714       }
715 
716       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
717       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
718       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
719     }
720   }
721 
722   /**
723    * Generates candidates which moves the replicas out of the rack for
724    * co-hosted region replicas in the same rack
725    */
726   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
727     @Override
728     Cluster.Action generate(Cluster cluster) {
729       int rackIndex = pickRandomRack(cluster);
730       if (cluster.numRacks <= 1 || rackIndex == -1) {
731         return super.generate(cluster);
732       }
733 
734       int regionIndex = selectCoHostedRegionPerGroup(
735         cluster.primariesOfRegionsPerRack[rackIndex],
736         cluster.regionsPerRack[rackIndex],
737         cluster.regionIndexToPrimaryIndex);
738 
739       // if there are no pairs of region replicas co-hosted, default to random generator
740       if (regionIndex == -1) {
741         // default to randompicker
742         return randomGenerator.generate(cluster);
743       }
744 
745       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
746       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
747 
748       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
749       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
750       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
751       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
752     }
753   }
754 
755   /**
756    * Base class of StochasticLoadBalancer's Cost Functions.
757    */
758   abstract static class CostFunction {
759 
760     private float multiplier = 0;
761 
762     protected Cluster cluster;
763 
764     CostFunction(Configuration c) {
765 
766     }
767 
768     float getMultiplier() {
769       return multiplier;
770     }
771 
772     void setMultiplier(float m) {
773       this.multiplier = m;
774     }
775 
776     /** Called once per LB invocation to give the cost function
777      * to initialize it's state, and perform any costly calculation.
778      */
779     void init(Cluster cluster) {
780       this.cluster = cluster;
781     }
782 
783     /** Called once per cluster Action to give the cost function
784      * an opportunity to update it's state. postAction() is always
785      * called at least once before cost() is called with the cluster
786      * that this action is performed on. */
787     void postAction(Action action) {
788       switch (action.type) {
789       case NULL: break;
790       case ASSIGN_REGION:
791         AssignRegionAction ar = (AssignRegionAction) action;
792         regionMoved(ar.region, -1, ar.server);
793         break;
794       case MOVE_REGION:
795         MoveRegionAction mra = (MoveRegionAction) action;
796         regionMoved(mra.region, mra.fromServer, mra.toServer);
797         break;
798       case SWAP_REGIONS:
799         SwapRegionsAction a = (SwapRegionsAction) action;
800         regionMoved(a.fromRegion, a.fromServer, a.toServer);
801         regionMoved(a.toRegion, a.toServer, a.fromServer);
802         break;
803       default:
804         throw new RuntimeException("Uknown action:" + action.type);
805       }
806     }
807 
808     protected void regionMoved(int region, int oldServer, int newServer) {
809     }
810 
811     abstract double cost();
812 
813     /**
814      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
815      * assumes that this is a zero sum set of costs.  It assumes that the worst case
816      * possible is all of the elements in one region server and the rest having 0.
817      *
818      * @param stats the costs
819      * @return a scaled set of costs.
820      */
821     protected double costFromArray(double[] stats) {
822       double totalCost = 0;
823       double total = getSum(stats);
824 
825       double count = stats.length;
826       double mean = total/count;
827 
828       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
829       // a zero sum cost for this to make sense.
830       double max = ((count - 1) * mean) + (total - mean);
831 
832       // It's possible that there aren't enough regions to go around
833       double min;
834       if (count > total) {
835         min = ((count - total) * mean) + ((1 - mean) * total);
836       } else {
837         // Some will have 1 more than everything else.
838         int numHigh = (int) (total - (Math.floor(mean) * count));
839         int numLow = (int) (count - numHigh);
840 
841         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
842 
843       }
844       min = Math.max(0, min);
845       for (int i=0; i<stats.length; i++) {
846         double n = stats[i];
847         double diff = Math.abs(mean - n);
848         totalCost += diff;
849       }
850 
851       double scaled =  scale(min, max, totalCost);
852       return scaled;
853     }
854 
855     private double getSum(double[] stats) {
856       double total = 0;
857       for(double s:stats) {
858         total += s;
859       }
860       return total;
861     }
862 
863     /**
864      * Scale the value between 0 and 1.
865      *
866      * @param min   Min value
867      * @param max   The Max value
868      * @param value The value to be scaled.
869      * @return The scaled value.
870      */
871     protected double scale(double min, double max, double value) {
872       if (max <= min || value <= min) {
873         return 0;
874       }
875       if ((max - min) == 0) return 0;
876 
877       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
878     }
879   }
880 
881   /**
882    * Given the starting state of the regions and a potential ending state
883    * compute cost based upon the number of regions that have moved.
884    */
885   static class MoveCostFunction extends CostFunction {
886     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
887     private static final String MAX_MOVES_PERCENT_KEY =
888         "hbase.master.balancer.stochastic.maxMovePercent";
889     private static final float DEFAULT_MOVE_COST = 100;
890     private static final int DEFAULT_MAX_MOVES = 600;
891     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
892 
893     private final float maxMovesPercent;
894 
895     MoveCostFunction(Configuration conf) {
896       super(conf);
897 
898       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
899       // that large benefits are need to overcome the cost of a move.
900       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
901       // What percent of the number of regions a single run of the balancer can move.
902       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
903     }
904 
905     @Override
906     double cost() {
907       // Try and size the max number of Moves, but always be prepared to move some.
908       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
909           DEFAULT_MAX_MOVES);
910 
911       double moveCost = cluster.numMovedRegions;
912 
913       // Don't let this single balance move more than the max moves.
914       // This allows better scaling to accurately represent the actual cost of a move.
915       if (moveCost > maxMoves) {
916         return 1000000;   // return a number much greater than any of the other cost
917       }
918 
919       return scale(0, cluster.numRegions, moveCost);
920     }
921   }
922 
923   /**
924    * Compute the cost of a potential cluster state from skew in number of
925    * regions on a cluster.
926    */
927   static class RegionCountSkewCostFunction extends CostFunction {
928     private static final String REGION_COUNT_SKEW_COST_KEY =
929         "hbase.master.balancer.stochastic.regionCountCost";
930     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
931 
932     private double[] stats = null;
933 
934     RegionCountSkewCostFunction(Configuration conf) {
935       super(conf);
936       // Load multiplier should be the greatest as it is the most general way to balance data.
937       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
938     }
939 
940     @Override
941     double cost() {
942       if (stats == null || stats.length != cluster.numServers) {
943         stats = new double[cluster.numServers];
944       }
945 
946       for (int i =0; i < cluster.numServers; i++) {
947         stats[i] = cluster.regionsPerServer[i].length;
948       }
949 
950       return costFromArray(stats);
951     }
952   }
953 
954   /**
955    * Compute the cost of a potential cluster state from skew in number of
956    * primary regions on a cluster.
957    */
958   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
959     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
960         "hbase.master.balancer.stochastic.primaryRegionCountCost";
961     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
962 
963     private double[] stats = null;
964 
965     PrimaryRegionCountSkewCostFunction(Configuration conf) {
966       super(conf);
967       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
968       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
969         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
970     }
971 
972     @Override
973     double cost() {
974       if (!cluster.hasRegionReplicas) {
975         return 0;
976       }
977       if (stats == null || stats.length != cluster.numServers) {
978         stats = new double[cluster.numServers];
979       }
980 
981       for (int i =0; i < cluster.numServers; i++) {
982         stats[i] = 0;
983         for (int regionIdx : cluster.regionsPerServer[i]) {
984           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
985             stats[i] ++;
986           }
987         }
988       }
989 
990       return costFromArray(stats);
991     }
992   }
993 
994   /**
995    * Compute the cost of a potential cluster configuration based upon how evenly
996    * distributed tables are.
997    */
998   static class TableSkewCostFunction extends CostFunction {
999 
1000     private static final String TABLE_SKEW_COST_KEY =
1001         "hbase.master.balancer.stochastic.tableSkewCost";
1002     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1003 
1004     TableSkewCostFunction(Configuration conf) {
1005       super(conf);
1006       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1007     }
1008 
1009     @Override
1010     double cost() {
1011       double max = cluster.numRegions;
1012       double min = ((double) cluster.numRegions) / cluster.numServers;
1013       double value = 0;
1014 
1015       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1016         value += cluster.numMaxRegionsPerTable[i];
1017       }
1018 
1019       return scale(min, max, value);
1020     }
1021   }
1022 
1023   /**
1024    * Compute a cost of a potential cluster configuration based upon where
1025    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1026    */
1027   static class LocalityCostFunction extends CostFunction {
1028 
1029     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1030     private static final float DEFAULT_LOCALITY_COST = 25;
1031 
1032     private MasterServices services;
1033 
1034     LocalityCostFunction(Configuration conf, MasterServices srv) {
1035       super(conf);
1036       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1037       this.services = srv;
1038     }
1039 
1040     void setServices(MasterServices srvc) {
1041       this.services = srvc;
1042     }
1043 
1044     @Override
1045     double cost() {
1046       double max = 0;
1047       double cost = 0;
1048 
1049       // If there's no master so there's no way anything else works.
1050       if (this.services == null) {
1051         return cost;
1052       }
1053 
1054       for (int i = 0; i < cluster.regionLocations.length; i++) {
1055         max += 1;
1056         int serverIndex = cluster.regionIndexToServerIndex[i];
1057         int[] regionLocations = cluster.regionLocations[i];
1058 
1059         // If we can't find where the data is getTopBlock returns null.
1060         // so count that as being the best possible.
1061         if (regionLocations == null) {
1062           continue;
1063         }
1064 
1065         int index = -1;
1066         for (int j = 0; j < regionLocations.length; j++) {
1067           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1068             index = j;
1069             break;
1070           }
1071         }
1072 
1073         if (index < 0) {
1074           cost += 1;
1075         } else {
1076           cost += (1 - cluster.getLocalityOfRegion(i, index));
1077         }
1078       }
1079       return scale(0, max, cost);
1080     }
1081   }
1082 
1083   /**
1084    * Base class the allows writing costs functions from rolling average of some
1085    * number from RegionLoad.
1086    */
1087   abstract static class CostFromRegionLoadFunction extends CostFunction {
1088 
1089     private ClusterStatus clusterStatus = null;
1090     private Map<String, Deque<RegionLoad>> loads = null;
1091     private double[] stats = null;
1092     CostFromRegionLoadFunction(Configuration conf) {
1093       super(conf);
1094     }
1095 
1096     void setClusterStatus(ClusterStatus status) {
1097       this.clusterStatus = status;
1098     }
1099 
1100     void setLoads(Map<String, Deque<RegionLoad>> l) {
1101       this.loads = l;
1102     }
1103 
1104     @Override
1105     double cost() {
1106       if (clusterStatus == null || loads == null) {
1107         return 0;
1108       }
1109 
1110       if (stats == null || stats.length != cluster.numServers) {
1111         stats = new double[cluster.numServers];
1112       }
1113 
1114       for (int i =0; i < stats.length; i++) {
1115         //Cost this server has from RegionLoad
1116         long cost = 0;
1117 
1118         // for every region on this server get the rl
1119         for(int regionIndex:cluster.regionsPerServer[i]) {
1120           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1121 
1122           // Now if we found a region load get the type of cost that was requested.
1123           if (regionLoadList != null) {
1124             cost += getRegionLoadCost(regionLoadList);
1125           }
1126         }
1127 
1128         // Add the total cost to the stats.
1129         stats[i] = cost;
1130       }
1131 
1132       // Now return the scaled cost from data held in the stats object.
1133       return costFromArray(stats);
1134     }
1135 
1136     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1137       double cost = 0;
1138 
1139       for (RegionLoad rl : regionLoadList) {
1140         double toAdd = getCostFromRl(rl);
1141 
1142         if (cost == 0) {
1143           cost = toAdd;
1144         } else {
1145           cost = (.5 * cost) + (.5 * toAdd);
1146         }
1147       }
1148 
1149       return cost;
1150     }
1151 
1152     protected abstract double getCostFromRl(RegionLoad rl);
1153   }
1154 
1155   /**
1156    * Compute the cost of total number of read requests  The more unbalanced the higher the
1157    * computed cost will be.  This uses a rolling average of regionload.
1158    */
1159 
1160   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1161 
1162     private static final String READ_REQUEST_COST_KEY =
1163         "hbase.master.balancer.stochastic.readRequestCost";
1164     private static final float DEFAULT_READ_REQUEST_COST = 5;
1165 
1166     ReadRequestCostFunction(Configuration conf) {
1167       super(conf);
1168       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1169     }
1170 
1171 
1172     @Override
1173     protected double getCostFromRl(RegionLoad rl) {
1174       return rl.getReadRequestsCount();
1175     }
1176   }
1177 
1178   /**
1179    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1180    * computed cost will be.  This uses a rolling average of regionload.
1181    */
1182   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1183 
1184     private static final String WRITE_REQUEST_COST_KEY =
1185         "hbase.master.balancer.stochastic.writeRequestCost";
1186     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1187 
1188     WriteRequestCostFunction(Configuration conf) {
1189       super(conf);
1190       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1191     }
1192 
1193     @Override
1194     protected double getCostFromRl(RegionLoad rl) {
1195       return rl.getWriteRequestsCount();
1196     }
1197   }
1198 
1199   /**
1200    * A cost function for region replicas. We give a very high cost to hosting
1201    * replicas of the same region in the same host. We do not prevent the case
1202    * though, since if numReplicas > numRegionServers, we still want to keep the
1203    * replica open.
1204    */
1205   static class RegionReplicaHostCostFunction extends CostFunction {
1206     private static final String REGION_REPLICA_HOST_COST_KEY =
1207         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1208     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1209 
1210     long maxCost = 0;
1211     long[] costsPerGroup; // group is either server, host or rack
1212     int[][] primariesOfRegionsPerGroup;
1213 
1214     public RegionReplicaHostCostFunction(Configuration conf) {
1215       super(conf);
1216       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1217         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1218     }
1219 
1220     @Override
1221     void init(Cluster cluster) {
1222       super.init(cluster);
1223       // max cost is the case where every region replica is hosted together regardless of host
1224       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1225       costsPerGroup = new long[cluster.numHosts];
1226       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1227           ? cluster.primariesOfRegionsPerHost
1228           : cluster.primariesOfRegionsPerServer;
1229       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1230         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1231       }
1232     }
1233 
1234     long getMaxCost(Cluster cluster) {
1235       if (!cluster.hasRegionReplicas) {
1236         return 0; // short circuit
1237       }
1238       // max cost is the case where every region replica is hosted together regardless of host
1239       int[] primariesOfRegions = new int[cluster.numRegions];
1240       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1241           cluster.regions.length);
1242 
1243       Arrays.sort(primariesOfRegions);
1244 
1245       // compute numReplicas from the sorted array
1246       return costPerGroup(primariesOfRegions);
1247     }
1248 
1249     @Override
1250     double cost() {
1251       if (maxCost <= 0) {
1252         return 0;
1253       }
1254 
1255       long totalCost = 0;
1256       for (int i = 0 ; i < costsPerGroup.length; i++) {
1257         totalCost += costsPerGroup[i];
1258       }
1259       return scale(0, maxCost, totalCost);
1260     }
1261 
1262     /**
1263      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1264      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1265      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1266      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1267      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1268      * @return a sum of numReplicas-1 squared for each primary region in the group.
1269      */
1270     protected long costPerGroup(int[] primariesOfRegions) {
1271       long cost = 0;
1272       int currentPrimary = -1;
1273       int currentPrimaryIndex = -1;
1274       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1275       // sharing the same primary will have consecutive numbers in the array.
1276       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1277         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1278         if (primary != currentPrimary) { // we see a new primary
1279           int numReplicas = j - currentPrimaryIndex;
1280           // square the cost
1281           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1282             cost += (numReplicas - 1) * (numReplicas - 1);
1283           }
1284           currentPrimary = primary;
1285           currentPrimaryIndex = j;
1286         }
1287       }
1288 
1289       return cost;
1290     }
1291 
1292     @Override
1293     protected void regionMoved(int region, int oldServer, int newServer) {
1294       if (maxCost <= 0) {
1295         return; // no need to compute
1296       }
1297       if (cluster.multiServersPerHost) {
1298         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1299         int newHost = cluster.serverIndexToHostIndex[newServer];
1300         if (newHost != oldHost) {
1301           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1302           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1303         }
1304       } else {
1305         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1306         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1307       }
1308     }
1309   }
1310 
1311   /**
1312    * A cost function for region replicas for the rack distribution. We give a relatively high
1313    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1314    * though.
1315    */
1316   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1317     private static final String REGION_REPLICA_RACK_COST_KEY =
1318         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1319     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1320 
1321     public RegionReplicaRackCostFunction(Configuration conf) {
1322       super(conf);
1323       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1324     }
1325 
1326     @Override
1327     void init(Cluster cluster) {
1328       this.cluster = cluster;
1329       if (cluster.numRacks <= 1) {
1330         maxCost = 0;
1331         return; // disabled for 1 rack
1332       }
1333       // max cost is the case where every region replica is hosted together regardless of rack
1334       maxCost = getMaxCost(cluster);
1335       costsPerGroup = new long[cluster.numRacks];
1336       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1337         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1338       }
1339     }
1340 
1341     @Override
1342     protected void regionMoved(int region, int oldServer, int newServer) {
1343       if (maxCost <= 0) {
1344         return; // no need to compute
1345       }
1346       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1347       int newRack = cluster.serverIndexToRackIndex[newServer];
1348       if (newRack != oldRack) {
1349         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1350         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1351       }
1352     }
1353   }
1354 
1355   /**
1356    * Compute the cost of total memstore size.  The more unbalanced the higher the
1357    * computed cost will be.  This uses a rolling average of regionload.
1358    */
1359   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1360 
1361     private static final String MEMSTORE_SIZE_COST_KEY =
1362         "hbase.master.balancer.stochastic.memstoreSizeCost";
1363     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1364 
1365     MemstoreSizeCostFunction(Configuration conf) {
1366       super(conf);
1367       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1368     }
1369 
1370     @Override
1371     protected double getCostFromRl(RegionLoad rl) {
1372       return rl.getMemStoreSizeMB();
1373     }
1374   }
1375   /**
1376    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1377    * computed cost will be.  This uses a rolling average of regionload.
1378    */
1379   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1380 
1381     private static final String STOREFILE_SIZE_COST_KEY =
1382         "hbase.master.balancer.stochastic.storefileSizeCost";
1383     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1384 
1385     StoreFileCostFunction(Configuration conf) {
1386       super(conf);
1387       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1388     }
1389 
1390     @Override
1391     protected double getCostFromRl(RegionLoad rl) {
1392       return rl.getStorefileSizeMB();
1393     }
1394   }
1395 }