View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Collection;
22  import java.util.Deque;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.Random;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.RegionLoad;
38  import org.apache.hadoop.hbase.ServerLoad;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.master.MasterServices;
41  import org.apache.hadoop.hbase.master.RegionPlan;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.hbase.util.Pair;
45  
46  /**
47   * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
48   * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
49   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
50   * <ul>
51   * <li>Region Load</li>
52   * <li>Table Load</li>
53   * <li>Data Locality</li>
54   * <li>Memstore Sizes</li>
55   * <li>Storefile Sizes</li>
56   * </ul>
57   *
58   *
59   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
60   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
61   * scaled by their respective multipliers:</p>
62   *
63   * <ul>
64   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
65   *   <li>hbase.master.balancer.stochastic.moveCost</li>
66   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
67   *   <li>hbase.master.balancer.stochastic.localityCost</li>
68   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
69   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
70   * </ul>
71   *
72   * <p>In addition to the above configurations, the balancer can be tuned by the following
73   * configuration values:</p>
74   * <ul>
75   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
76   *   controls what the max number of regions that can be moved in a single invocation of this
77   *   balancer.</li>
78   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
79   *   regions is multiplied to try and get the number of times the balancer will
80   *   mutate all servers.</li>
81   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
82   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
83   *   value and the above computation.</li>
84   * </ul>
85   *
86   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
87   * so that the balancer gets the full picture of all loads on the cluster.</p>
88   */
89  @InterfaceAudience.Private
90  public class StochasticLoadBalancer extends BaseLoadBalancer {
91  
92    private static final String STEPS_PER_REGION_KEY =
93        "hbase.master.balancer.stochastic.stepsPerRegion";
94    private static final String MAX_STEPS_KEY =
95        "hbase.master.balancer.stochastic.maxSteps";
96    private static final String MAX_RUNNING_TIME_KEY =
97        "hbase.master.balancer.stochastic.maxRunningTime";
98    private static final String KEEP_REGION_LOADS =
99        "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
100 
101   private static final Random RANDOM = new Random(System.currentTimeMillis());
102   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
103 
104   private final RegionLocationFinder regionFinder = new RegionLocationFinder();
105   private ClusterStatus clusterStatus = null;
106   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
107 
108   // values are defaults
109   private int maxSteps = 1000000;
110   private int stepsPerRegion = 800;
111   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
112   private int numRegionLoadsToRemember = 15;
113 
114   private RegionPicker[] pickers;
115   private CostFromRegionLoadFunction[] regionLoadFunctions;
116   private CostFunction[] costFunctions;
117   // Keep locality based picker and cost function to alert them
118   // when new services are offered
119   private LocalityBasedPicker localityPicker;
120   private LocalityCostFunction localityCost;
121 
122   @Override
123   public void setConf(Configuration conf) {
124     super.setConf(conf);
125 
126     regionFinder.setConf(conf);
127 
128     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
129 
130     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
131     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
132 
133     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
134 
135     localityPicker = new LocalityBasedPicker(services);
136     localityCost = new LocalityCostFunction(conf, services);
137 
138     pickers = new RegionPicker[] {
139       new RandomRegionPicker(),
140       new LoadPicker(),
141       localityPicker
142     };
143 
144     regionLoadFunctions = new CostFromRegionLoadFunction[] {
145       new ReadRequestCostFunction(conf),
146       new WriteRequestCostFunction(conf),
147       new MemstoreSizeCostFunction(conf),
148       new StoreFileCostFunction(conf)
149     };
150 
151     costFunctions = new CostFunction[]{
152       new RegionCountSkewCostFunction(conf),
153       new MoveCostFunction(conf),
154       localityCost,
155       new TableSkewCostFunction(conf),
156       regionLoadFunctions[0],
157       regionLoadFunctions[1],
158       regionLoadFunctions[2],
159       regionLoadFunctions[3],
160     };
161   }
162 
163   @Override
164   protected void setSlop(Configuration conf) {
165     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
166   }
167 
168   @Override
169   public void setClusterStatus(ClusterStatus st) {
170     super.setClusterStatus(st);
171     regionFinder.setClusterStatus(st);
172     this.clusterStatus = st;
173     updateRegionLoad();
174     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
175       cost.setClusterStatus(st);
176     }
177   }
178 
179   @Override
180   public void setMasterServices(MasterServices masterServices) {
181     super.setMasterServices(masterServices);
182     this.regionFinder.setServices(masterServices);
183     this.localityCost.setServices(masterServices);
184     this.localityPicker.setServices(masterServices);
185 
186   }
187 
188   /**
189    * Given the cluster state this will try and approach an optimal balance. This
190    * should always approach the optimal state given enough steps.
191    */
192   @Override
193   public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
194     if (!needsBalance(new ClusterLoadState(clusterState))) {
195       return null;
196     }
197 
198     long startTime = EnvironmentEdgeManager.currentTimeMillis();
199 
200     // Keep track of servers to iterate through them.
201     Cluster cluster = new Cluster(clusterState, loads, regionFinder);
202     double currentCost = computeCost(cluster, Double.MAX_VALUE);
203 
204     double initCost = currentCost;
205     double newCost = currentCost;
206 
207     long computedMaxSteps = Math.min(this.maxSteps,
208         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
209     // Perform a stochastic walk to see if we can get a good fit.
210     long step;
211     for (step = 0; step < computedMaxSteps; step++) {
212       int pickerIdx = RANDOM.nextInt(pickers.length);
213       RegionPicker p = pickers[pickerIdx];
214       Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
215 
216       int leftServer = picks.getFirst().getFirst();
217       int leftRegion = picks.getFirst().getSecond();
218       int rightServer = picks.getSecond().getFirst();
219       int rightRegion = picks.getSecond().getSecond();
220 
221       // We couldn't find a server
222       if (rightServer < 0 || leftServer < 0) {
223         continue;
224       }
225 
226       // We randomly picked to do nothing.
227       if (leftRegion < 0 && rightRegion < 0) {
228         continue;
229       }
230 
231       cluster.moveOrSwapRegion(leftServer,
232           rightServer,
233           leftRegion,
234           rightRegion);
235 
236       newCost = computeCost(cluster, currentCost);
237       // Should this be kept?
238       if (newCost < currentCost) {
239         currentCost = newCost;
240       } else {
241         // Put things back the way they were before.
242         // TODO: undo by remembering old values, using an UndoAction class
243         cluster.moveOrSwapRegion(leftServer,
244             rightServer,
245             rightRegion,
246             leftRegion);
247       }
248 
249       if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
250           maxRunningTime) {
251         break;
252       }
253     }
254 
255     long endTime = EnvironmentEdgeManager.currentTimeMillis();
256 
257     metricsBalancer.balanceCluster(endTime - startTime);
258 
259     if (initCost > currentCost) {
260       List<RegionPlan> plans = createRegionPlans(cluster);
261       if (LOG.isDebugEnabled()) {
262         LOG.debug("Finished computing new load balance plan.  Computation took "
263             + (endTime - startTime) + "ms to try " + step
264             + " different iterations.  Found a solution that moves "
265             + plans.size() + " regions; Going from a computed cost of "
266             + initCost + " to a new cost of " + currentCost);
267       }
268       return plans;
269     }
270     if (LOG.isDebugEnabled()) {
271       LOG.debug("Could not find a better load balance plan.  Tried "
272           + step + " different configurations in " + (endTime - startTime)
273           + "ms, and did not find anything with a computed cost less than " + initCost);
274     }
275     return null;
276   }
277 
278   /**
279    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
280    * state.
281    *
282    * @param cluster The state of the cluster
283    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
284    */
285   private List<RegionPlan> createRegionPlans(Cluster cluster) {
286     List<RegionPlan> plans = new LinkedList<RegionPlan>();
287     for (int regionIndex = 0;
288          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
289       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
290       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
291 
292       if (initialServerIndex != newServerIndex) {
293         HRegionInfo region = cluster.regions[regionIndex];
294         ServerName initialServer = cluster.servers[initialServerIndex];
295         ServerName newServer = cluster.servers[newServerIndex];
296 
297         if (LOG.isTraceEnabled()) {
298           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
299               + initialServer.getHostname() + " to " + newServer.getHostname());
300         }
301         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
302         plans.add(rp);
303       }
304     }
305     return plans;
306   }
307 
308   /**
309    * Store the current region loads.
310    */
311   private synchronized void updateRegionLoad() {
312     // We create a new hashmap so that regions that are no longer there are removed.
313     // However we temporarily need the old loads so we can use them to keep the rolling average.
314     Map<String, Deque<RegionLoad>> oldLoads = loads;
315     loads = new HashMap<String, Deque<RegionLoad>>();
316 
317     for (ServerName sn : clusterStatus.getServers()) {
318       ServerLoad sl = clusterStatus.getLoad(sn);
319       if (sl == null) {
320         continue;
321       }
322       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
323         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
324         if (rLoads == null) {
325           // There was nothing there
326           rLoads = new ArrayDeque<RegionLoad>();
327         } else if (rLoads.size() >= 15) {
328           rLoads.remove();
329         }
330         rLoads.add(entry.getValue());
331         loads.put(Bytes.toString(entry.getKey()), rLoads);
332 
333       }
334     }
335 
336     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
337       cost.setLoads(loads);
338     }
339   }
340 
341 
342   /**
343    * This is the main cost function.  It will compute a cost associated with a proposed cluster
344    * state.  All different costs will be combined with their multipliers to produce a double cost.
345    *
346    * @param cluster The state of the cluster
347    * @param previousCost the previous cost. This is used as an early out.
348    * @return a double of a cost associated with the proposed cluster state.  This cost is an
349    *         aggregate of all individual cost functions.
350    */
351   protected double computeCost(Cluster cluster, double previousCost) {
352     double total = 0;
353 
354     for (CostFunction c:costFunctions) {
355       if (c.getMultiplier() <= 0) {
356         continue;
357       }
358 
359       total += c.getMultiplier() * c.cost(cluster);
360 
361       if (total > previousCost) {
362         return total;
363       }
364     }
365     return total;
366   }
367 
368   abstract static class RegionPicker {
369     abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
370 
371     /**
372      * From a list of regions pick a random one. Null can be returned which
373      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
374      * rather than swap.
375      *
376      * @param cluster        The state of the cluster
377      * @param server         index of the server
378      * @param chanceOfNoSwap Chance that this will decide to try a move rather
379      *                       than a swap.
380      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
381      *         suggested.
382      */
383     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
384       // Check to see if this is just a move.
385       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
386         // signal a move only.
387         return -1;
388       }
389       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
390       return cluster.regionsPerServer[server][rand];
391 
392     }
393     protected int pickRandomServer(Cluster cluster) {
394       if (cluster.numServers < 1) {
395         return -1;
396       }
397 
398       return RANDOM.nextInt(cluster.numServers);
399     }
400     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
401       if (cluster.numServers < 2) {
402         return -1;
403       }
404       while (true) {
405         int otherServerIndex = pickRandomServer(cluster);
406         if (otherServerIndex != serverIndex) {
407           return otherServerIndex;
408         }
409       }
410     }
411 
412     protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
413                                                        int thisServer,
414                                                        int otherServer) {
415       if (thisServer < 0 || otherServer < 0) {
416         return new Pair<Integer, Integer>(-1, -1);
417       }
418 
419       // Decide who is most likely to need another region
420       int thisRegionCount = cluster.getNumRegions(thisServer);
421       int otherRegionCount = cluster.getNumRegions(otherServer);
422 
423       // Assign the chance based upon the above
424       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
425       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
426 
427       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
428       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
429 
430       return new Pair<Integer, Integer>(thisRegion, otherRegion);
431     }
432   }
433 
434   static class RandomRegionPicker extends RegionPicker {
435 
436     @Override
437     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
438 
439       int thisServer = pickRandomServer(cluster);
440 
441       // Pick the other server
442       int otherServer = pickOtherRandomServer(cluster, thisServer);
443 
444       Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
445 
446       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
447           new Pair<Integer, Integer>(thisServer, regions.getFirst()),
448           new Pair<Integer, Integer>(otherServer, regions.getSecond())
449 
450       );
451     }
452 
453   }
454 
455   public static class LoadPicker extends RegionPicker {
456 
457     @Override
458     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
459       cluster.sortServersByRegionCount();
460       int thisServer = pickMostLoadedServer(cluster, -1);
461       int otherServer = pickLeastLoadedServer(cluster, thisServer);
462 
463       Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
464       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
465           new Pair<Integer, Integer>(thisServer, regions.getFirst()),
466           new Pair<Integer, Integer>(otherServer, regions.getSecond())
467 
468       );
469     }
470 
471     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
472       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
473 
474       int index = 0;
475       while (servers[index] == null || servers[index] == thisServer) {
476         index++;
477         if (index == servers.length) {
478           return -1;
479         }
480       }
481       return servers[index];
482     }
483 
484     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
485       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
486 
487       int index = servers.length - 1;
488       while (servers[index] == null || servers[index] == thisServer) {
489         index--;
490         if (index < 0) {
491           return -1;
492         }
493       }
494       return servers[index];
495     }
496   }
497 
498   static class LocalityBasedPicker extends RegionPicker {
499 
500     private MasterServices masterServices;
501 
502     LocalityBasedPicker(MasterServices masterServices) {
503       this.masterServices = masterServices;
504     }
505 
506     @Override
507     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
508       if (this.masterServices == null) {
509         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
510             new Pair<Integer, Integer>(-1,-1),
511             new Pair<Integer, Integer>(-1,-1)
512         );
513       }
514       // Pick a random region server
515       int thisServer = pickRandomServer(cluster);
516 
517       // Pick a random region on this server
518       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
519 
520       if (thisRegion == -1) {
521         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
522             new Pair<Integer, Integer>(-1,-1),
523             new Pair<Integer, Integer>(-1,-1)
524         );
525       }
526 
527       // Pick the server with the highest locality
528       int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
529 
530       // pick an region on the other server to potentially swap
531       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
532 
533       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
534           new Pair<Integer, Integer>(thisServer,thisRegion),
535           new Pair<Integer, Integer>(otherServer,otherRegion)
536       );
537     }
538 
539     private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
540       int[] regionLocations = cluster.regionLocations[thisRegion];
541 
542       if (regionLocations == null || regionLocations.length <= 1) {
543         return pickOtherRandomServer(cluster, thisServer);
544       }
545 
546       for (int loc : regionLocations) {
547         if (loc >= 0 && loc != thisServer) { // find the first suitable server
548           return loc;
549         }
550       }
551 
552       // no location found
553       return pickOtherRandomServer(cluster, thisServer);
554     }
555 
556     void setServices(MasterServices services) {
557       this.masterServices = services;
558     }
559   }
560 
561   /**
562    * Base class of StochasticLoadBalancer's Cost Functions.
563    */
564   public abstract static class CostFunction {
565 
566     private float multiplier = 0;
567     private Configuration conf;
568 
569     CostFunction(Configuration c) {
570       this.conf = c;
571     }
572 
573     float getMultiplier() {
574       return multiplier;
575     }
576 
577     void setMultiplier(float m) {
578       this.multiplier = m;
579     }
580 
581     abstract double cost(Cluster cluster);
582 
583     /**
584      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
585      * assumes that this is a zero sum set of costs.  It assumes that the worst case
586      * possible is all of the elements in one region server and the rest having 0.
587      *
588      * @param stats the costs
589      * @return a scaled set of costs.
590      */
591     protected double costFromArray(double[] stats) {
592       double totalCost = 0;
593       double total = getSum(stats);
594       double mean = total/((double)stats.length);
595       double count = stats.length;
596 
597       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
598       // a zero sum cost for this to make sense.
599       // TODO: Should we make this sum of square errors?
600       double max = ((count - 1) * mean) + (total - mean);
601       for (double n : stats) {
602         double diff = Math.abs(mean - n);
603         totalCost += diff;
604       }
605 
606       double scaled =  scale(0, max, totalCost);
607       return scaled;
608     }
609 
610 
611 
612     private double getSum(double[] stats) {
613       double total = 0;
614       for(double s:stats) {
615         total += s;
616       }
617       return total;
618     }
619 
620     /**
621      * Scale the value between 0 and 1.
622      *
623      * @param min   Min value
624      * @param max   The Max value
625      * @param value The value to be scaled.
626      * @return The scaled value.
627      */
628     protected double scale(double min, double max, double value) {
629       if (max == 0 || value == 0) {
630         return 0;
631       }
632 
633       return Math.max(0d, Math.min(1d, (value - min) / max));
634     }
635   }
636 
637   /**
638    * Given the starting state of the regions and a potential ending state
639    * compute cost based upon the number of regions that have moved.
640    */
641   public static class MoveCostFunction extends CostFunction {
642     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
643     private static final String MAX_MOVES_PERCENT_KEY =
644         "hbase.master.balancer.stochastic.maxMovePercent";
645     private static final float DEFAULT_MOVE_COST = 100;
646     private static final int DEFAULT_MAX_MOVES = 600;
647     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
648     private static final int META_MOVE_COST_MULT = 10;
649 
650     private final float maxMovesPercent;
651 
652     MoveCostFunction(Configuration conf) {
653       super(conf);
654 
655       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
656       // that large benefits are need to overcome the cost of a move.
657       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
658       // What percent of the number of regions a single run of the balancer can move.
659       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
660     }
661 
662     @Override
663     double cost(Cluster cluster) {
664       // Try and size the max number of Moves, but always be prepared to move some.
665       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
666           DEFAULT_MAX_MOVES);
667 
668       double moveCost = cluster.numMovedRegions;
669 
670       // Don't let this single balance move more than the max moves.
671       // This allows better scaling to accurately represent the actual cost of a move.
672       if (moveCost > maxMoves) {
673         return 1000000;   // return a number much greater than any of the other cost
674       }
675 
676       // hbase:meta region is special
677       if (cluster.numMovedMetaRegions > 0) {
678         // assume each hbase:meta region move costs 10 times
679         moveCost += META_MOVE_COST_MULT * cluster.numMovedMetaRegions;
680       }
681 
682       return scale(0, cluster.numRegions + META_MOVE_COST_MULT, moveCost);
683     }
684   }
685 
686   /**
687    * Compute the cost of a potential cluster state from skew in number of
688    * regions on a cluster.
689    */
690   public static class RegionCountSkewCostFunction extends CostFunction {
691     private static final String REGION_COUNT_SKEW_COST_KEY =
692         "hbase.master.balancer.stochastic.regionCountCost";
693     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
694 
695     private double[] stats = null;
696 
697     RegionCountSkewCostFunction(Configuration conf) {
698       super(conf);
699       // Load multiplier should be the greatest as it is the most general way to balance data.
700       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
701     }
702 
703     @Override
704     double cost(Cluster cluster) {
705       if (stats == null || stats.length != cluster.numServers) {
706         stats = new double[cluster.numServers];
707       }
708 
709       for (int i =0; i < cluster.numServers; i++) {
710         stats[i] = cluster.regionsPerServer[i].length;
711       }
712       return costFromArray(stats);
713     }
714   }
715 
716   /**
717    * Compute the cost of a potential cluster configuration based upon how evenly
718    * distributed tables are.
719    */
720   public static class TableSkewCostFunction extends CostFunction {
721 
722     private static final String TABLE_SKEW_COST_KEY =
723         "hbase.master.balancer.stochastic.tableSkewCost";
724     private static final float DEFAULT_TABLE_SKEW_COST = 35;
725 
726     TableSkewCostFunction(Configuration conf) {
727       super(conf);
728       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
729     }
730 
731     @Override
732     double cost(Cluster cluster) {
733       double max = cluster.numRegions;
734       double min = ((double) cluster.numRegions) / cluster.numServers;
735       double value = 0;
736 
737       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
738         value += cluster.numMaxRegionsPerTable[i];
739       }
740 
741       return scale(min, max, value);
742     }
743   }
744 
745 
746   /**
747    * Compute a cost of a potential cluster configuration based upon where
748    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
749    */
750   public static class LocalityCostFunction extends CostFunction {
751 
752     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
753     private static final float DEFAULT_LOCALITY_COST = 25;
754 
755     private MasterServices services;
756 
757     LocalityCostFunction(Configuration conf, MasterServices srv) {
758       super(conf);
759       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
760       this.services = srv;
761     }
762 
763     void setServices(MasterServices srvc) {
764       this.services = srvc;
765     }
766 
767     @Override
768     double cost(Cluster cluster) {
769       double max = 0;
770       double cost = 0;
771 
772       // If there's no master so there's no way anything else works.
773       if (this.services == null) {
774         return cost;
775       }
776 
777       for (int i = 0; i < cluster.regionLocations.length; i++) {
778         max += 1;
779         int serverIndex = cluster.regionIndexToServerIndex[i];
780         int[] regionLocations = cluster.regionLocations[i];
781 
782         // If we can't find where the data is getTopBlock returns null.
783         // so count that as being the best possible.
784         if (regionLocations == null) {
785           continue;
786         }
787 
788         int index = -1;
789         for (int j = 0; j < regionLocations.length; j++) {
790           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
791             index = j;
792             break;
793           }
794         }
795 
796         if (index < 0) {
797           cost += 1;
798         } else {
799           cost += (double) index / (double) regionLocations.length;
800         }
801       }
802       return scale(0, max, cost);
803     }
804   }
805 
806   /**
807    * Base class the allows writing costs functions from rolling average of some
808    * number from RegionLoad.
809    */
810   public abstract static class CostFromRegionLoadFunction extends CostFunction {
811 
812     private ClusterStatus clusterStatus = null;
813     private Map<String, Deque<RegionLoad>> loads = null;
814     private double[] stats = null;
815     CostFromRegionLoadFunction(Configuration conf) {
816       super(conf);
817     }
818 
819     void setClusterStatus(ClusterStatus status) {
820       this.clusterStatus = status;
821     }
822 
823     void setLoads(Map<String, Deque<RegionLoad>> l) {
824       this.loads = l;
825     }
826 
827 
828     @Override
829     double cost(Cluster cluster) {
830       if (clusterStatus == null || loads == null) {
831         return 0;
832       }
833 
834       if (stats == null || stats.length != cluster.numServers) {
835         stats = new double[cluster.numServers];
836       }
837 
838       for (int i =0; i < stats.length; i++) {
839         //Cost this server has from RegionLoad
840         long cost = 0;
841 
842         // for every region on this server get the rl
843         for(int regionIndex:cluster.regionsPerServer[i]) {
844           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
845 
846           // Now if we found a region load get the type of cost that was requested.
847           if (regionLoadList != null) {
848             cost += getRegionLoadCost(regionLoadList);
849           }
850         }
851 
852         // Add the total cost to the stats.
853         stats[i] = cost;
854       }
855 
856       // Now return the scaled cost from data held in the stats object.
857       return costFromArray(stats);
858     }
859 
860     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
861       double cost = 0;
862 
863       for (RegionLoad rl : regionLoadList) {
864         double toAdd = getCostFromRl(rl);
865 
866         if (cost == 0) {
867           cost = toAdd;
868         } else {
869           cost = (.5 * cost) + (.5 * toAdd);
870         }
871       }
872 
873       return cost;
874     }
875 
876     protected abstract double getCostFromRl(RegionLoad rl);
877   }
878 
879   /**
880    * Compute the cost of total number of read requests  The more unbalanced the higher the
881    * computed cost will be.  This uses a rolling average of regionload.
882    */
883 
884   public static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
885 
886     private static final String READ_REQUEST_COST_KEY =
887         "hbase.master.balancer.stochastic.readRequestCost";
888     private static final float DEFAULT_READ_REQUEST_COST = 5;
889 
890     ReadRequestCostFunction(Configuration conf) {
891       super(conf);
892       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
893     }
894 
895 
896     @Override
897     protected double getCostFromRl(RegionLoad rl) {
898       return rl.getReadRequestsCount();
899     }
900   }
901 
902   /**
903    * Compute the cost of total number of write requests.  The more unbalanced the higher the
904    * computed cost will be.  This uses a rolling average of regionload.
905    */
906   public static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
907 
908     private static final String WRITE_REQUEST_COST_KEY =
909         "hbase.master.balancer.stochastic.writeRequestCost";
910     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
911 
912     WriteRequestCostFunction(Configuration conf) {
913       super(conf);
914       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
915     }
916 
917     @Override
918     protected double getCostFromRl(RegionLoad rl) {
919       return rl.getWriteRequestsCount();
920     }
921   }
922 
923   /**
924    * Compute the cost of total memstore size.  The more unbalanced the higher the
925    * computed cost will be.  This uses a rolling average of regionload.
926    */
927   public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
928 
929     private static final String MEMSTORE_SIZE_COST_KEY =
930         "hbase.master.balancer.stochastic.memstoreSizeCost";
931     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
932 
933     MemstoreSizeCostFunction(Configuration conf) {
934       super(conf);
935       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
936     }
937 
938     @Override
939     protected double getCostFromRl(RegionLoad rl) {
940       return rl.getMemStoreSizeMB();
941     }
942   }
943   /**
944    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
945    * computed cost will be.  This uses a rolling average of regionload.
946    */
947   public static class StoreFileCostFunction extends CostFromRegionLoadFunction {
948 
949     private static final String STOREFILE_SIZE_COST_KEY =
950         "hbase.master.balancer.stochastic.storefileSizeCost";
951     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
952 
953     StoreFileCostFunction(Configuration conf) {
954       super(conf);
955       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
956     }
957 
958     @Override
959     protected double getCostFromRl(RegionLoad rl) {
960       return rl.getStorefileSizeMB();
961     }
962   }
963 }