View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import java.util.ArrayDeque;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.util.Deque;
25  import java.util.HashMap;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Random;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.ClusterStatus;
37  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLoad;
40  import org.apache.hadoop.hbase.ServerLoad;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.master.MasterServices;
43  import org.apache.hadoop.hbase.master.RegionPlan;
44  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
48  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  
52  /**
53   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
54   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
55   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
56   * <ul>
57   * <li>Region Load</li>
58   * <li>Table Load</li>
59   * <li>Data Locality</li>
60   * <li>Memstore Sizes</li>
61   * <li>Storefile Sizes</li>
62   * </ul>
63   *
64   *
65   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
66   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
67   * scaled by their respective multipliers:</p>
68   *
69   * <ul>
70   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
71   *   <li>hbase.master.balancer.stochastic.moveCost</li>
72   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
73   *   <li>hbase.master.balancer.stochastic.localityCost</li>
74   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
75   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
76   * </ul>
77   *
78   * <p>In addition to the above configurations, the balancer can be tuned by the following
79   * configuration values:</p>
80   * <ul>
81   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
82   *   controls what the max number of regions that can be moved in a single invocation of this
83   *   balancer.</li>
84   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
85   *   regions is multiplied to try and get the number of times the balancer will
86   *   mutate all servers.</li>
87   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
88   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
89   *   value and the above computation.</li>
90   * </ul>
91   *
92   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
93   * so that the balancer gets the full picture of all loads on the cluster.</p>
94   */
95  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
96  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
97    justification="Complaint is about costFunctions not being synchronized; not end of the world")
98  public class StochasticLoadBalancer extends BaseLoadBalancer {
99  
100   protected static final String STEPS_PER_REGION_KEY =
101       "hbase.master.balancer.stochastic.stepsPerRegion";
102   protected static final String MAX_STEPS_KEY =
103       "hbase.master.balancer.stochastic.maxSteps";
104   protected static final String MAX_RUNNING_TIME_KEY =
105       "hbase.master.balancer.stochastic.maxRunningTime";
106   protected static final String KEEP_REGION_LOADS =
107       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
108 
109   private static final Random RANDOM = new Random(System.currentTimeMillis());
110   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
111 
112   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
113 
114   // values are defaults
115   private int maxSteps = 1000000;
116   private int stepsPerRegion = 800;
117   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
118   private int numRegionLoadsToRemember = 15;
119 
120   private CandidateGenerator[] candidateGenerators;
121   private CostFromRegionLoadFunction[] regionLoadFunctions;
122 
123   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
124 
125   // Keep locality based picker and cost function to alert them
126   // when new services are offered
127   private LocalityBasedCandidateGenerator localityCandidateGenerator;
128   private LocalityCostFunction localityCost;
129   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
130   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
131 
132   @Override
133   public void onConfigurationChange(Configuration conf) {
134     setConf(conf);
135   }
136 
137   @Override
138   public synchronized void setConf(Configuration conf) {
139     super.setConf(conf);
140     LOG.info("loading config");
141 
142     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
143 
144     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
145     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
146 
147     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
148 
149     if (localityCandidateGenerator == null) {
150       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
151     }
152     localityCost = new LocalityCostFunction(conf, services);
153 
154     if (candidateGenerators == null) {
155       candidateGenerators = new CandidateGenerator[] {
156           new RandomCandidateGenerator(),
157           new LoadCandidateGenerator(),
158           localityCandidateGenerator,
159           new RegionReplicaRackCandidateGenerator(),
160       };
161     }
162 
163     regionLoadFunctions = new CostFromRegionLoadFunction[] {
164       new ReadRequestCostFunction(conf),
165       new WriteRequestCostFunction(conf),
166       new MemstoreSizeCostFunction(conf),
167       new StoreFileCostFunction(conf)
168     };
169 
170     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
171     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
172 
173     costFunctions = new CostFunction[]{
174       new RegionCountSkewCostFunction(conf),
175       new PrimaryRegionCountSkewCostFunction(conf),
176       new MoveCostFunction(conf),
177       localityCost,
178       new TableSkewCostFunction(conf),
179       regionReplicaHostCostFunction,
180       regionReplicaRackCostFunction,
181       regionLoadFunctions[0],
182       regionLoadFunctions[1],
183       regionLoadFunctions[2],
184       regionLoadFunctions[3],
185     };
186   }
187 
188   @Override
189   protected void setSlop(Configuration conf) {
190     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
191   }
192 
193   @Override
194   public synchronized void setClusterStatus(ClusterStatus st) {
195     super.setClusterStatus(st);
196     updateRegionLoad();
197     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
198       cost.setClusterStatus(st);
199     }
200   }
201 
202   @Override
203   public synchronized void setMasterServices(MasterServices masterServices) {
204     super.setMasterServices(masterServices);
205     this.localityCost.setServices(masterServices);
206     this.localityCandidateGenerator.setServices(masterServices);
207 
208   }
209 
210   @Override
211   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
212     regionReplicaHostCostFunction.init(c);
213     if (regionReplicaHostCostFunction.cost() > 0) return true;
214     regionReplicaRackCostFunction.init(c);
215     if (regionReplicaRackCostFunction.cost() > 0) return true;
216     return false;
217   }
218 
219   @VisibleForTesting
220   Cluster.Action nextAction(Cluster cluster) {
221     return candidateGenerators[(RANDOM.nextInt(candidateGenerators.length))]
222             .generate(cluster);
223   }
224 
225   /**
226    * Given the cluster state this will try and approach an optimal balance. This
227    * should always approach the optimal state given enough steps.
228    */
229   @Override
230   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
231     List<HRegionInfo>> clusterState) {
232     List<RegionPlan> plans = balanceMasterRegions(clusterState);
233     if (plans != null || clusterState == null || clusterState.size() <= 1) {
234       return plans;
235     }
236     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
237       if (clusterState.size() <= 2) {
238         return null;
239       }
240       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
241       clusterState.remove(masterServerName);
242     }
243 
244     // On clusters with lots of HFileLinks or lots of reference files,
245     // instantiating the storefile infos can be quite expensive.
246     // Allow turning this feature off if the locality cost is not going to
247     // be used in any computations.
248     RegionLocationFinder finder = null;
249     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
250       finder = this.regionFinder;
251     }
252 
253     //The clusterState that is given to this method contains the state
254     //of all the regions in the table(s) (that's true today)
255     // Keep track of servers to iterate through them.
256     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
257     if (!needsBalance(cluster)) {
258       return null;
259     }
260 
261     long startTime = EnvironmentEdgeManager.currentTime();
262 
263     initCosts(cluster);
264 
265     double currentCost = computeCost(cluster, Double.MAX_VALUE);
266 
267     double initCost = currentCost;
268     double newCost = currentCost;
269 
270     long computedMaxSteps = Math.min(this.maxSteps,
271         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
272     // Perform a stochastic walk to see if we can get a good fit.
273     long step;
274 
275     for (step = 0; step < computedMaxSteps; step++) {
276       Cluster.Action action = nextAction(cluster);
277 
278       if (action.type == Type.NULL) {
279         continue;
280       }
281 
282       cluster.doAction(action);
283       updateCostsWithAction(cluster, action);
284 
285       newCost = computeCost(cluster, currentCost);
286 
287       // Should this be kept?
288       if (newCost < currentCost) {
289         currentCost = newCost;
290       } else {
291         // Put things back the way they were before.
292         // TODO: undo by remembering old values
293         Action undoAction = action.undoAction();
294         cluster.doAction(undoAction);
295         updateCostsWithAction(cluster, undoAction);
296       }
297 
298       if (EnvironmentEdgeManager.currentTime() - startTime >
299           maxRunningTime) {
300         break;
301       }
302     }
303     long endTime = EnvironmentEdgeManager.currentTime();
304 
305     metricsBalancer.balanceCluster(endTime - startTime);
306 
307     if (initCost > currentCost) {
308       plans = createRegionPlans(cluster);
309       if (LOG.isDebugEnabled()) {
310         LOG.debug("Finished computing new load balance plan.  Computation took "
311             + (endTime - startTime) + "ms to try " + step
312             + " different iterations.  Found a solution that moves "
313             + plans.size() + " regions; Going from a computed cost of "
314             + initCost + " to a new cost of " + currentCost);
315       }
316       return plans;
317     }
318     if (LOG.isDebugEnabled()) {
319       LOG.debug("Could not find a better load balance plan.  Tried "
320           + step + " different configurations in " + (endTime - startTime)
321           + "ms, and did not find anything with a computed cost less than " + initCost);
322     }
323     return null;
324   }
325 
326   /**
327    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
328    * state.
329    *
330    * @param cluster The state of the cluster
331    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
332    */
333   private List<RegionPlan> createRegionPlans(Cluster cluster) {
334     List<RegionPlan> plans = new LinkedList<RegionPlan>();
335     for (int regionIndex = 0;
336          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
337       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
338       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
339 
340       if (initialServerIndex != newServerIndex) {
341         HRegionInfo region = cluster.regions[regionIndex];
342         ServerName initialServer = cluster.servers[initialServerIndex];
343         ServerName newServer = cluster.servers[newServerIndex];
344 
345         if (LOG.isTraceEnabled()) {
346           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
347               + initialServer.getHostname() + " to " + newServer.getHostname());
348         }
349         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
350         plans.add(rp);
351       }
352     }
353     return plans;
354   }
355 
356   /**
357    * Store the current region loads.
358    */
359   private synchronized void updateRegionLoad() {
360     // We create a new hashmap so that regions that are no longer there are removed.
361     // However we temporarily need the old loads so we can use them to keep the rolling average.
362     Map<String, Deque<RegionLoad>> oldLoads = loads;
363     loads = new HashMap<String, Deque<RegionLoad>>();
364 
365     for (ServerName sn : clusterStatus.getServers()) {
366       ServerLoad sl = clusterStatus.getLoad(sn);
367       if (sl == null) {
368         continue;
369       }
370       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
371         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
372         if (rLoads == null) {
373           // There was nothing there
374           rLoads = new ArrayDeque<RegionLoad>();
375         } else if (rLoads.size() >= numRegionLoadsToRemember) {
376           rLoads.remove();
377         }
378         rLoads.add(entry.getValue());
379         loads.put(Bytes.toString(entry.getKey()), rLoads);
380 
381       }
382     }
383 
384     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
385       cost.setLoads(loads);
386     }
387   }
388 
389   protected void initCosts(Cluster cluster) {
390     for (CostFunction c:costFunctions) {
391       c.init(cluster);
392     }
393   }
394 
395   protected void updateCostsWithAction(Cluster cluster, Action action) {
396     for (CostFunction c : costFunctions) {
397       c.postAction(action);
398     }
399   }
400 
401   /**
402    * This is the main cost function.  It will compute a cost associated with a proposed cluster
403    * state.  All different costs will be combined with their multipliers to produce a double cost.
404    *
405    * @param cluster The state of the cluster
406    * @param previousCost the previous cost. This is used as an early out.
407    * @return a double of a cost associated with the proposed cluster state.  This cost is an
408    *         aggregate of all individual cost functions.
409    */
410   protected double computeCost(Cluster cluster, double previousCost) {
411     double total = 0;
412 
413     for (CostFunction c:costFunctions) {
414       if (c.getMultiplier() <= 0) {
415         continue;
416       }
417 
418       total += c.getMultiplier() * c.cost();
419 
420       if (total > previousCost) {
421         return total;
422       }
423     }
424     return total;
425   }
426 
427   /** Generates a candidate action to be applied to the cluster for cost function search */
428   abstract static class CandidateGenerator {
429     abstract Cluster.Action generate(Cluster cluster);
430 
431     /**
432      * From a list of regions pick a random one. Null can be returned which
433      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
434      * rather than swap.
435      *
436      * @param cluster        The state of the cluster
437      * @param server         index of the server
438      * @param chanceOfNoSwap Chance that this will decide to try a move rather
439      *                       than a swap.
440      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
441      *         suggested.
442      */
443     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
444       // Check to see if this is just a move.
445       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
446         // signal a move only.
447         return -1;
448       }
449       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
450       return cluster.regionsPerServer[server][rand];
451 
452     }
453     protected int pickRandomServer(Cluster cluster) {
454       if (cluster.numServers < 1) {
455         return -1;
456       }
457 
458       return RANDOM.nextInt(cluster.numServers);
459     }
460 
461     protected int pickRandomRack(Cluster cluster) {
462       if (cluster.numRacks < 1) {
463         return -1;
464       }
465 
466       return RANDOM.nextInt(cluster.numRacks);
467     }
468 
469     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
470       if (cluster.numServers < 2) {
471         return -1;
472       }
473       while (true) {
474         int otherServerIndex = pickRandomServer(cluster);
475         if (otherServerIndex != serverIndex) {
476           return otherServerIndex;
477         }
478       }
479     }
480 
481     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
482       if (cluster.numRacks < 2) {
483         return -1;
484       }
485       while (true) {
486         int otherRackIndex = pickRandomRack(cluster);
487         if (otherRackIndex != rackIndex) {
488           return otherRackIndex;
489         }
490       }
491     }
492 
493     protected Cluster.Action pickRandomRegions(Cluster cluster,
494                                                        int thisServer,
495                                                        int otherServer) {
496       if (thisServer < 0 || otherServer < 0) {
497         return Cluster.NullAction;
498       }
499 
500       // Decide who is most likely to need another region
501       int thisRegionCount = cluster.getNumRegions(thisServer);
502       int otherRegionCount = cluster.getNumRegions(otherServer);
503 
504       // Assign the chance based upon the above
505       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
506       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
507 
508       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
509       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
510 
511       return getAction(thisServer, thisRegion, otherServer, otherRegion);
512     }
513 
514     protected Cluster.Action getAction (int fromServer, int fromRegion,
515         int toServer, int toRegion) {
516       if (fromServer < 0 || toServer < 0) {
517         return Cluster.NullAction;
518       }
519       if (fromRegion > 0 && toRegion > 0) {
520         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
521           toServer, toRegion);
522       } else if (fromRegion > 0) {
523         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
524       } else if (toRegion > 0) {
525         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
526       } else {
527         return Cluster.NullAction;
528       }
529     }
530   }
531 
532   static class RandomCandidateGenerator extends CandidateGenerator {
533 
534     @Override
535     Cluster.Action generate(Cluster cluster) {
536 
537       int thisServer = pickRandomServer(cluster);
538 
539       // Pick the other server
540       int otherServer = pickOtherRandomServer(cluster, thisServer);
541 
542       return pickRandomRegions(cluster, thisServer, otherServer);
543     }
544   }
545 
546   static class LoadCandidateGenerator extends CandidateGenerator {
547 
548     @Override
549     Cluster.Action generate(Cluster cluster) {
550       cluster.sortServersByRegionCount();
551       int thisServer = pickMostLoadedServer(cluster, -1);
552       int otherServer = pickLeastLoadedServer(cluster, thisServer);
553 
554       return pickRandomRegions(cluster, thisServer, otherServer);
555     }
556 
557     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
558       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
559 
560       int index = 0;
561       while (servers[index] == null || servers[index] == thisServer) {
562         index++;
563         if (index == servers.length) {
564           return -1;
565         }
566       }
567       return servers[index];
568     }
569 
570     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
571       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
572 
573       int index = servers.length - 1;
574       while (servers[index] == null || servers[index] == thisServer) {
575         index--;
576         if (index < 0) {
577           return -1;
578         }
579       }
580       return servers[index];
581     }
582   }
583 
584   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
585 
586     private MasterServices masterServices;
587 
588     LocalityBasedCandidateGenerator(MasterServices masterServices) {
589       this.masterServices = masterServices;
590     }
591 
592     @Override
593     Cluster.Action generate(Cluster cluster) {
594       if (this.masterServices == null) {
595         int thisServer = pickRandomServer(cluster);
596         // Pick the other server
597         int otherServer = pickOtherRandomServer(cluster, thisServer);
598         return pickRandomRegions(cluster, thisServer, otherServer);
599       }
600 
601       cluster.calculateRegionServerLocalities();
602       // Pick server with lowest locality
603       int thisServer = pickLowestLocalityServer(cluster);
604       int thisRegion;
605       if (thisServer == -1) {
606         LOG.warn("Could not pick lowest locality region server");
607         return Cluster.NullAction;
608       } else {
609       // Pick lowest locality region on this server
610         thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
611       }
612 
613       if (thisRegion == -1) {
614         return Cluster.NullAction;
615       }
616 
617       // Pick the least loaded server with good locality for the region
618       int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
619 
620       if (otherServer == -1) {
621         return Cluster.NullAction;
622       }
623 
624       // Let the candidate region be moved to its highest locality server.
625       int otherRegion = -1;
626 
627       return getAction(thisServer, thisRegion, otherServer, otherRegion);
628     }
629 
630     private int pickLowestLocalityServer(Cluster cluster) {
631       return cluster.getLowestLocalityRegionServer();
632     }
633 
634     private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
635       return cluster.getLowestLocalityRegionOnServer(server);
636     }
637 
638     void setServices(MasterServices services) {
639       this.masterServices = services;
640     }
641   }
642 
643   /**
644    * Generates candidates which moves the replicas out of the region server for
645    * co-hosted region replicas
646    */
647   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
648 
649     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
650 
651     /**
652      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
653      * (a group is a server, host or rack)
654      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
655      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
656      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
657      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
658      * @return a regionIndex for the selected primary or -1 if there is no co-locating
659      */
660     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
661         , int[] regionIndexToPrimaryIndex) {
662       int currentPrimary = -1;
663       int currentPrimaryIndex = -1;
664       int selectedPrimaryIndex = -1;
665       double currentLargestRandom = -1;
666       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
667       // ids for the regions hosted in server, a consecutive repetition means that replicas
668       // are co-hosted
669       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
670         int primary = j < primariesOfRegionsPerGroup.length
671             ? primariesOfRegionsPerGroup[j] : -1;
672         if (primary != currentPrimary) { // check for whether we see a new primary
673           int numReplicas = j - currentPrimaryIndex;
674           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
675             // decide to select this primary region id or not
676             double currentRandom = RANDOM.nextDouble();
677             // we don't know how many region replicas are co-hosted, we will randomly select one
678             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
679             if (currentRandom > currentLargestRandom) {
680               selectedPrimaryIndex = currentPrimary;
681               currentLargestRandom = currentRandom;
682             }
683           }
684           currentPrimary = primary;
685           currentPrimaryIndex = j;
686         }
687       }
688 
689       // we have found the primary id for the region to move. Now find the actual regionIndex
690       // with the given primary, prefer to move the secondary region.
691       for (int j = 0; j < regionsPerGroup.length; j++) {
692         int regionIndex = regionsPerGroup[j];
693         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
694           // always move the secondary, not the primary
695           if (selectedPrimaryIndex != regionIndex) {
696             return regionIndex;
697           }
698         }
699       }
700       return -1;
701     }
702 
703     @Override
704     Cluster.Action generate(Cluster cluster) {
705       int serverIndex = pickRandomServer(cluster);
706       if (cluster.numServers <= 1 || serverIndex == -1) {
707         return Cluster.NullAction;
708       }
709 
710       int regionIndex = selectCoHostedRegionPerGroup(
711         cluster.primariesOfRegionsPerServer[serverIndex],
712         cluster.regionsPerServer[serverIndex],
713         cluster.regionIndexToPrimaryIndex);
714 
715       // if there are no pairs of region replicas co-hosted, default to random generator
716       if (regionIndex == -1) {
717         // default to randompicker
718         return randomGenerator.generate(cluster);
719       }
720 
721       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
722       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
723       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
724     }
725   }
726 
727   /**
728    * Generates candidates which moves the replicas out of the rack for
729    * co-hosted region replicas in the same rack
730    */
731   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
732     @Override
733     Cluster.Action generate(Cluster cluster) {
734       int rackIndex = pickRandomRack(cluster);
735       if (cluster.numRacks <= 1 || rackIndex == -1) {
736         return super.generate(cluster);
737       }
738 
739       int regionIndex = selectCoHostedRegionPerGroup(
740         cluster.primariesOfRegionsPerRack[rackIndex],
741         cluster.regionsPerRack[rackIndex],
742         cluster.regionIndexToPrimaryIndex);
743 
744       // if there are no pairs of region replicas co-hosted, default to random generator
745       if (regionIndex == -1) {
746         // default to randompicker
747         return randomGenerator.generate(cluster);
748       }
749 
750       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
751       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
752 
753       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
754       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
755       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
756       return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
757     }
758   }
759 
760   /**
761    * Base class of StochasticLoadBalancer's Cost Functions.
762    */
763   abstract static class CostFunction {
764 
765     private float multiplier = 0;
766 
767     protected Cluster cluster;
768 
769     CostFunction(Configuration c) {
770 
771     }
772 
773     float getMultiplier() {
774       return multiplier;
775     }
776 
777     void setMultiplier(float m) {
778       this.multiplier = m;
779     }
780 
781     /** Called once per LB invocation to give the cost function
782      * to initialize it's state, and perform any costly calculation.
783      */
784     void init(Cluster cluster) {
785       this.cluster = cluster;
786     }
787 
788     /** Called once per cluster Action to give the cost function
789      * an opportunity to update it's state. postAction() is always
790      * called at least once before cost() is called with the cluster
791      * that this action is performed on. */
792     void postAction(Action action) {
793       switch (action.type) {
794       case NULL: break;
795       case ASSIGN_REGION:
796         AssignRegionAction ar = (AssignRegionAction) action;
797         regionMoved(ar.region, -1, ar.server);
798         break;
799       case MOVE_REGION:
800         MoveRegionAction mra = (MoveRegionAction) action;
801         regionMoved(mra.region, mra.fromServer, mra.toServer);
802         break;
803       case SWAP_REGIONS:
804         SwapRegionsAction a = (SwapRegionsAction) action;
805         regionMoved(a.fromRegion, a.fromServer, a.toServer);
806         regionMoved(a.toRegion, a.toServer, a.fromServer);
807         break;
808       default:
809         throw new RuntimeException("Uknown action:" + action.type);
810       }
811     }
812 
813     protected void regionMoved(int region, int oldServer, int newServer) {
814     }
815 
816     abstract double cost();
817 
818     /**
819      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
820      * assumes that this is a zero sum set of costs.  It assumes that the worst case
821      * possible is all of the elements in one region server and the rest having 0.
822      *
823      * @param stats the costs
824      * @return a scaled set of costs.
825      */
826     protected double costFromArray(double[] stats) {
827       double totalCost = 0;
828       double total = getSum(stats);
829 
830       double count = stats.length;
831       double mean = total/count;
832 
833       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
834       // a zero sum cost for this to make sense.
835       double max = ((count - 1) * mean) + (total - mean);
836 
837       // It's possible that there aren't enough regions to go around
838       double min;
839       if (count > total) {
840         min = ((count - total) * mean) + ((1 - mean) * total);
841       } else {
842         // Some will have 1 more than everything else.
843         int numHigh = (int) (total - (Math.floor(mean) * count));
844         int numLow = (int) (count - numHigh);
845 
846         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
847 
848       }
849       min = Math.max(0, min);
850       for (int i=0; i<stats.length; i++) {
851         double n = stats[i];
852         double diff = Math.abs(mean - n);
853         totalCost += diff;
854       }
855 
856       double scaled =  scale(min, max, totalCost);
857       return scaled;
858     }
859 
860     private double getSum(double[] stats) {
861       double total = 0;
862       for(double s:stats) {
863         total += s;
864       }
865       return total;
866     }
867 
868     /**
869      * Scale the value between 0 and 1.
870      *
871      * @param min   Min value
872      * @param max   The Max value
873      * @param value The value to be scaled.
874      * @return The scaled value.
875      */
876     protected double scale(double min, double max, double value) {
877       if (max <= min || value <= min) {
878         return 0;
879       }
880       if ((max - min) == 0) return 0;
881 
882       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
883     }
884   }
885 
886   /**
887    * Given the starting state of the regions and a potential ending state
888    * compute cost based upon the number of regions that have moved.
889    */
890   static class MoveCostFunction extends CostFunction {
891     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
892     private static final String MAX_MOVES_PERCENT_KEY =
893         "hbase.master.balancer.stochastic.maxMovePercent";
894     private static final float DEFAULT_MOVE_COST = 100;
895     private static final int DEFAULT_MAX_MOVES = 600;
896     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
897 
898     private final float maxMovesPercent;
899 
900     MoveCostFunction(Configuration conf) {
901       super(conf);
902 
903       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
904       // that large benefits are need to overcome the cost of a move.
905       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
906       // What percent of the number of regions a single run of the balancer can move.
907       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
908     }
909 
910     @Override
911     double cost() {
912       // Try and size the max number of Moves, but always be prepared to move some.
913       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
914           DEFAULT_MAX_MOVES);
915 
916       double moveCost = cluster.numMovedRegions;
917 
918       // Don't let this single balance move more than the max moves.
919       // This allows better scaling to accurately represent the actual cost of a move.
920       if (moveCost > maxMoves) {
921         return 1000000;   // return a number much greater than any of the other cost
922       }
923 
924       return scale(0, cluster.numRegions, moveCost);
925     }
926   }
927 
928   /**
929    * Compute the cost of a potential cluster state from skew in number of
930    * regions on a cluster.
931    */
932   static class RegionCountSkewCostFunction extends CostFunction {
933     private static final String REGION_COUNT_SKEW_COST_KEY =
934         "hbase.master.balancer.stochastic.regionCountCost";
935     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
936 
937     private double[] stats = null;
938 
939     RegionCountSkewCostFunction(Configuration conf) {
940       super(conf);
941       // Load multiplier should be the greatest as it is the most general way to balance data.
942       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
943     }
944 
945     @Override
946     double cost() {
947       if (stats == null || stats.length != cluster.numServers) {
948         stats = new double[cluster.numServers];
949       }
950 
951       for (int i =0; i < cluster.numServers; i++) {
952         stats[i] = cluster.regionsPerServer[i].length;
953       }
954 
955       return costFromArray(stats);
956     }
957   }
958 
959   /**
960    * Compute the cost of a potential cluster state from skew in number of
961    * primary regions on a cluster.
962    */
963   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
964     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
965         "hbase.master.balancer.stochastic.primaryRegionCountCost";
966     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
967 
968     private double[] stats = null;
969 
970     PrimaryRegionCountSkewCostFunction(Configuration conf) {
971       super(conf);
972       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
973       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
974         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
975     }
976 
977     @Override
978     double cost() {
979       if (!cluster.hasRegionReplicas) {
980         return 0;
981       }
982       if (stats == null || stats.length != cluster.numServers) {
983         stats = new double[cluster.numServers];
984       }
985 
986       for (int i =0; i < cluster.numServers; i++) {
987         stats[i] = 0;
988         for (int regionIdx : cluster.regionsPerServer[i]) {
989           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
990             stats[i] ++;
991           }
992         }
993       }
994 
995       return costFromArray(stats);
996     }
997   }
998 
999   /**
1000    * Compute the cost of a potential cluster configuration based upon how evenly
1001    * distributed tables are.
1002    */
1003   static class TableSkewCostFunction extends CostFunction {
1004 
1005     private static final String TABLE_SKEW_COST_KEY =
1006         "hbase.master.balancer.stochastic.tableSkewCost";
1007     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1008 
1009     TableSkewCostFunction(Configuration conf) {
1010       super(conf);
1011       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1012     }
1013 
1014     @Override
1015     double cost() {
1016       double max = cluster.numRegions;
1017       double min = ((double) cluster.numRegions) / cluster.numServers;
1018       double value = 0;
1019 
1020       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1021         value += cluster.numMaxRegionsPerTable[i];
1022       }
1023 
1024       return scale(min, max, value);
1025     }
1026   }
1027 
1028   /**
1029    * Compute a cost of a potential cluster configuration based upon where
1030    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1031    */
1032   static class LocalityCostFunction extends CostFunction {
1033 
1034     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1035     private static final float DEFAULT_LOCALITY_COST = 25;
1036 
1037     private MasterServices services;
1038 
1039     LocalityCostFunction(Configuration conf, MasterServices srv) {
1040       super(conf);
1041       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1042       this.services = srv;
1043     }
1044 
1045     void setServices(MasterServices srvc) {
1046       this.services = srvc;
1047     }
1048 
1049     @Override
1050     double cost() {
1051       double max = 0;
1052       double cost = 0;
1053 
1054       // If there's no master so there's no way anything else works.
1055       if (this.services == null) {
1056         return cost;
1057       }
1058 
1059       for (int i = 0; i < cluster.regionLocations.length; i++) {
1060         max += 1;
1061         int serverIndex = cluster.regionIndexToServerIndex[i];
1062         int[] regionLocations = cluster.regionLocations[i];
1063 
1064         // If we can't find where the data is getTopBlock returns null.
1065         // so count that as being the best possible.
1066         if (regionLocations == null) {
1067           continue;
1068         }
1069 
1070         int index = -1;
1071         for (int j = 0; j < regionLocations.length; j++) {
1072           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1073             index = j;
1074             break;
1075           }
1076         }
1077 
1078         if (index < 0) {
1079           cost += 1;
1080         } else {
1081           cost += (1 - cluster.getLocalityOfRegion(i, index));
1082         }
1083       }
1084       return scale(0, max, cost);
1085     }
1086   }
1087 
1088   /**
1089    * Base class the allows writing costs functions from rolling average of some
1090    * number from RegionLoad.
1091    */
1092   abstract static class CostFromRegionLoadFunction extends CostFunction {
1093 
1094     private ClusterStatus clusterStatus = null;
1095     private Map<String, Deque<RegionLoad>> loads = null;
1096     private double[] stats = null;
1097     CostFromRegionLoadFunction(Configuration conf) {
1098       super(conf);
1099     }
1100 
1101     void setClusterStatus(ClusterStatus status) {
1102       this.clusterStatus = status;
1103     }
1104 
1105     void setLoads(Map<String, Deque<RegionLoad>> l) {
1106       this.loads = l;
1107     }
1108 
1109     @Override
1110     double cost() {
1111       if (clusterStatus == null || loads == null) {
1112         return 0;
1113       }
1114 
1115       if (stats == null || stats.length != cluster.numServers) {
1116         stats = new double[cluster.numServers];
1117       }
1118 
1119       for (int i =0; i < stats.length; i++) {
1120         //Cost this server has from RegionLoad
1121         long cost = 0;
1122 
1123         // for every region on this server get the rl
1124         for(int regionIndex:cluster.regionsPerServer[i]) {
1125           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1126 
1127           // Now if we found a region load get the type of cost that was requested.
1128           if (regionLoadList != null) {
1129             cost += getRegionLoadCost(regionLoadList);
1130           }
1131         }
1132 
1133         // Add the total cost to the stats.
1134         stats[i] = cost;
1135       }
1136 
1137       // Now return the scaled cost from data held in the stats object.
1138       return costFromArray(stats);
1139     }
1140 
1141     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1142       double cost = 0;
1143 
1144       for (RegionLoad rl : regionLoadList) {
1145         double toAdd = getCostFromRl(rl);
1146 
1147         if (cost == 0) {
1148           cost = toAdd;
1149         } else {
1150           cost = (.5 * cost) + (.5 * toAdd);
1151         }
1152       }
1153 
1154       return cost;
1155     }
1156 
1157     protected abstract double getCostFromRl(RegionLoad rl);
1158   }
1159 
1160   /**
1161    * Compute the cost of total number of read requests  The more unbalanced the higher the
1162    * computed cost will be.  This uses a rolling average of regionload.
1163    */
1164 
1165   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1166 
1167     private static final String READ_REQUEST_COST_KEY =
1168         "hbase.master.balancer.stochastic.readRequestCost";
1169     private static final float DEFAULT_READ_REQUEST_COST = 5;
1170 
1171     ReadRequestCostFunction(Configuration conf) {
1172       super(conf);
1173       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1174     }
1175 
1176 
1177     @Override
1178     protected double getCostFromRl(RegionLoad rl) {
1179       return rl.getReadRequestsCount();
1180     }
1181   }
1182 
1183   /**
1184    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1185    * computed cost will be.  This uses a rolling average of regionload.
1186    */
1187   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1188 
1189     private static final String WRITE_REQUEST_COST_KEY =
1190         "hbase.master.balancer.stochastic.writeRequestCost";
1191     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1192 
1193     WriteRequestCostFunction(Configuration conf) {
1194       super(conf);
1195       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1196     }
1197 
1198     @Override
1199     protected double getCostFromRl(RegionLoad rl) {
1200       return rl.getWriteRequestsCount();
1201     }
1202   }
1203 
1204   /**
1205    * A cost function for region replicas. We give a very high cost to hosting
1206    * replicas of the same region in the same host. We do not prevent the case
1207    * though, since if numReplicas > numRegionServers, we still want to keep the
1208    * replica open.
1209    */
1210   static class RegionReplicaHostCostFunction extends CostFunction {
1211     private static final String REGION_REPLICA_HOST_COST_KEY =
1212         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1213     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1214 
1215     long maxCost = 0;
1216     long[] costsPerGroup; // group is either server, host or rack
1217     int[][] primariesOfRegionsPerGroup;
1218 
1219     public RegionReplicaHostCostFunction(Configuration conf) {
1220       super(conf);
1221       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1222         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1223     }
1224 
1225     @Override
1226     void init(Cluster cluster) {
1227       super.init(cluster);
1228       // max cost is the case where every region replica is hosted together regardless of host
1229       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1230       costsPerGroup = new long[cluster.numHosts];
1231       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1232           ? cluster.primariesOfRegionsPerHost
1233           : cluster.primariesOfRegionsPerServer;
1234       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1235         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1236       }
1237     }
1238 
1239     long getMaxCost(Cluster cluster) {
1240       if (!cluster.hasRegionReplicas) {
1241         return 0; // short circuit
1242       }
1243       // max cost is the case where every region replica is hosted together regardless of host
1244       int[] primariesOfRegions = new int[cluster.numRegions];
1245       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1246           cluster.regions.length);
1247 
1248       Arrays.sort(primariesOfRegions);
1249 
1250       // compute numReplicas from the sorted array
1251       return costPerGroup(primariesOfRegions);
1252     }
1253 
1254     @Override
1255     double cost() {
1256       if (maxCost <= 0) {
1257         return 0;
1258       }
1259 
1260       long totalCost = 0;
1261       for (int i = 0 ; i < costsPerGroup.length; i++) {
1262         totalCost += costsPerGroup[i];
1263       }
1264       return scale(0, maxCost, totalCost);
1265     }
1266 
1267     /**
1268      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1269      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1270      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1271      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1272      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1273      * @return a sum of numReplicas-1 squared for each primary region in the group.
1274      */
1275     protected long costPerGroup(int[] primariesOfRegions) {
1276       long cost = 0;
1277       int currentPrimary = -1;
1278       int currentPrimaryIndex = -1;
1279       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1280       // sharing the same primary will have consecutive numbers in the array.
1281       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1282         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1283         if (primary != currentPrimary) { // we see a new primary
1284           int numReplicas = j - currentPrimaryIndex;
1285           // square the cost
1286           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1287             cost += (numReplicas - 1) * (numReplicas - 1);
1288           }
1289           currentPrimary = primary;
1290           currentPrimaryIndex = j;
1291         }
1292       }
1293 
1294       return cost;
1295     }
1296 
1297     @Override
1298     protected void regionMoved(int region, int oldServer, int newServer) {
1299       if (maxCost <= 0) {
1300         return; // no need to compute
1301       }
1302       if (cluster.multiServersPerHost) {
1303         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1304         int newHost = cluster.serverIndexToHostIndex[newServer];
1305         if (newHost != oldHost) {
1306           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1307           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1308         }
1309       } else {
1310         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1311         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1312       }
1313     }
1314   }
1315 
1316   /**
1317    * A cost function for region replicas for the rack distribution. We give a relatively high
1318    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1319    * though.
1320    */
1321   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1322     private static final String REGION_REPLICA_RACK_COST_KEY =
1323         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1324     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1325 
1326     public RegionReplicaRackCostFunction(Configuration conf) {
1327       super(conf);
1328       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1329     }
1330 
1331     @Override
1332     void init(Cluster cluster) {
1333       this.cluster = cluster;
1334       if (cluster.numRacks <= 1) {
1335         maxCost = 0;
1336         return; // disabled for 1 rack
1337       }
1338       // max cost is the case where every region replica is hosted together regardless of rack
1339       maxCost = getMaxCost(cluster);
1340       costsPerGroup = new long[cluster.numRacks];
1341       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1342         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1343       }
1344     }
1345 
1346     @Override
1347     protected void regionMoved(int region, int oldServer, int newServer) {
1348       if (maxCost <= 0) {
1349         return; // no need to compute
1350       }
1351       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1352       int newRack = cluster.serverIndexToRackIndex[newServer];
1353       if (newRack != oldRack) {
1354         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1355         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1356       }
1357     }
1358   }
1359 
1360   /**
1361    * Compute the cost of total memstore size.  The more unbalanced the higher the
1362    * computed cost will be.  This uses a rolling average of regionload.
1363    */
1364   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1365 
1366     private static final String MEMSTORE_SIZE_COST_KEY =
1367         "hbase.master.balancer.stochastic.memstoreSizeCost";
1368     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1369 
1370     MemstoreSizeCostFunction(Configuration conf) {
1371       super(conf);
1372       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1373     }
1374 
1375     @Override
1376     protected double getCostFromRl(RegionLoad rl) {
1377       return rl.getMemStoreSizeMB();
1378     }
1379   }
1380   /**
1381    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1382    * computed cost will be.  This uses a rolling average of regionload.
1383    */
1384   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1385 
1386     private static final String STOREFILE_SIZE_COST_KEY =
1387         "hbase.master.balancer.stochastic.storefileSizeCost";
1388     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1389 
1390     StoreFileCostFunction(Configuration conf) {
1391       super(conf);
1392       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1393     }
1394 
1395     @Override
1396     protected double getCostFromRl(RegionLoad rl) {
1397       return rl.getStorefileSizeMB();
1398     }
1399   }
1400 }