View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Collection;
22  import java.util.Deque;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.Random;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.RegionLoad;
38  import org.apache.hadoop.hbase.ServerLoad;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.master.MasterServices;
41  import org.apache.hadoop.hbase.master.RegionPlan;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.hbase.util.Pair;
45  
46  /**
47   * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
48   * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
49   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
50   * <ul>
51   * <li>Region Load</li>
52   * <li>Table Load</li>
53   * <li>Data Locality</li>
54   * <li>Memstore Sizes</li>
55   * <li>Storefile Sizes</li>
56   * </ul>
57   *
58   *
59   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
60   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
61   * scaled by their respective multipliers:</p>
62   *
63   * <ul>
64   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
65   *   <li>hbase.master.balancer.stochastic.moveCost</li>
66   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
67   *   <li>hbase.master.balancer.stochastic.localityCost</li>
68   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
69   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
70   * </ul>
71   *
72   * <p>In addition to the above configurations, the balancer can be tuned by the following
73   * configuration values:</p>
74   * <ul>
75   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
76   *   controls what the max number of regions that can be moved in a single invocation of this
77   *   balancer.</li>
78   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
79   *   regions is multiplied to try and get the number of times the balancer will
80   *   mutate all servers.</li>
81   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
82   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
83   *   value and the above computation.</li>
84   * </ul>
85   *
86   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
87   * so that the balancer gets the full picture of all loads on the cluster.</p>
88   */
89  @InterfaceAudience.Private
90  public class StochasticLoadBalancer extends BaseLoadBalancer {
91  
92    private static final String STEPS_PER_REGION_KEY =
93        "hbase.master.balancer.stochastic.stepsPerRegion";
94    private static final String MAX_STEPS_KEY =
95        "hbase.master.balancer.stochastic.maxSteps";
96    private static final String MAX_RUNNING_TIME_KEY =
97        "hbase.master.balancer.stochastic.maxRunningTime";
98    private static final String KEEP_REGION_LOADS =
99        "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
100 
101   private static final Random RANDOM = new Random(System.currentTimeMillis());
102   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
103 
104   private final RegionLocationFinder regionFinder = new RegionLocationFinder();
105   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
106 
107   // values are defaults
108   private int maxSteps = 1000000;
109   private int stepsPerRegion = 800;
110   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
111   private int numRegionLoadsToRemember = 15;
112 
113   private RegionPicker[] pickers;
114   private CostFromRegionLoadFunction[] regionLoadFunctions;
115   private CostFunction[] costFunctions;
116   // Keep locality based picker and cost function to alert them
117   // when new services are offered
118   private LocalityBasedPicker localityPicker;
119   private LocalityCostFunction localityCost;
120 
121   @Override
122   public void setConf(Configuration conf) {
123     super.setConf(conf);
124 
125     regionFinder.setConf(conf);
126 
127     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
128 
129     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
130     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
131 
132     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
133 
134     localityPicker = new LocalityBasedPicker(services);
135     localityCost = new LocalityCostFunction(conf, services);
136 
137     pickers = new RegionPicker[] {
138       new RandomRegionPicker(),
139       new LoadPicker(),
140       localityPicker
141     };
142 
143     regionLoadFunctions = new CostFromRegionLoadFunction[] {
144       new ReadRequestCostFunction(conf),
145       new WriteRequestCostFunction(conf),
146       new MemstoreSizeCostFunction(conf),
147       new StoreFileCostFunction(conf)
148     };
149 
150     costFunctions = new CostFunction[]{
151       new RegionCountSkewCostFunction(conf, activeMasterWeight, backupMasterWeight),
152       new MoveCostFunction(conf),
153       localityCost,
154       new TableSkewCostFunction(conf),
155       regionLoadFunctions[0],
156       regionLoadFunctions[1],
157       regionLoadFunctions[2],
158       regionLoadFunctions[3],
159     };
160   }
161 
162   @Override
163   protected void setSlop(Configuration conf) {
164     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
165   }
166 
167   @Override
168   public void setClusterStatus(ClusterStatus st) {
169     super.setClusterStatus(st);
170     regionFinder.setClusterStatus(st);
171     updateRegionLoad();
172     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
173       cost.setClusterStatus(st);
174     }
175   }
176 
177   @Override
178   public void setMasterServices(MasterServices masterServices) {
179     super.setMasterServices(masterServices);
180     this.regionFinder.setServices(masterServices);
181     this.localityCost.setServices(masterServices);
182     this.localityPicker.setServices(masterServices);
183 
184   }
185 
186   /**
187    * Given the cluster state this will try and approach an optimal balance. This
188    * should always approach the optimal state given enough steps.
189    */
190   @Override
191   public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
192     List<RegionPlan> plans = balanceMasterRegions(clusterState);
193     if (plans != null) {
194       return plans;
195     }
196     filterExcludedServers(clusterState);
197     if (!needsBalance(new ClusterLoadState(masterServerName,
198         getBackupMasters(), backupMasterWeight, clusterState))) {
199       return null;
200     }
201 
202     long startTime = EnvironmentEdgeManager.currentTimeMillis();
203 
204     // Keep track of servers to iterate through them.
205     Cluster cluster = new Cluster(masterServerName, clusterState,
206       loads, regionFinder, getBackupMasters(), tablesOnMaster);
207     double currentCost = computeCost(cluster, Double.MAX_VALUE);
208 
209     double initCost = currentCost;
210     double newCost = currentCost;
211 
212     long computedMaxSteps = Math.min(this.maxSteps,
213         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
214     // Perform a stochastic walk to see if we can get a good fit.
215     long step;
216     for (step = 0; step < computedMaxSteps; step++) {
217       int pickerIdx = RANDOM.nextInt(pickers.length);
218       RegionPicker p = pickers[pickerIdx];
219       Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
220 
221       int leftServer = picks.getFirst().getFirst();
222       int leftRegion = picks.getFirst().getSecond();
223       int rightServer = picks.getSecond().getFirst();
224       int rightRegion = picks.getSecond().getSecond();
225 
226       // We couldn't find a server
227       if (rightServer < 0 || leftServer < 0) {
228         continue;
229       }
230 
231       // We randomly picked to do nothing.
232       if (leftRegion < 0 && rightRegion < 0) {
233         continue;
234       }
235 
236       cluster.moveOrSwapRegion(leftServer,
237           rightServer,
238           leftRegion,
239           rightRegion);
240 
241       newCost = computeCost(cluster, currentCost);
242       // Should this be kept?
243       if (newCost < currentCost) {
244         currentCost = newCost;
245       } else {
246         // Put things back the way they were before.
247         // TODO: undo by remembering old values, using an UndoAction class
248         cluster.moveOrSwapRegion(leftServer,
249             rightServer,
250             rightRegion,
251             leftRegion);
252       }
253 
254       if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
255           maxRunningTime) {
256         break;
257       }
258     }
259 
260     long endTime = EnvironmentEdgeManager.currentTimeMillis();
261 
262     metricsBalancer.balanceCluster(endTime - startTime);
263 
264     if (initCost > currentCost) {
265       plans = createRegionPlans(cluster);
266       if (LOG.isDebugEnabled()) {
267         LOG.debug("Finished computing new load balance plan.  Computation took "
268             + (endTime - startTime) + "ms to try " + step
269             + " different iterations.  Found a solution that moves "
270             + plans.size() + " regions; Going from a computed cost of "
271             + initCost + " to a new cost of " + currentCost);
272       }
273       return plans;
274     }
275     if (LOG.isDebugEnabled()) {
276       LOG.debug("Could not find a better load balance plan.  Tried "
277           + step + " different configurations in " + (endTime - startTime)
278           + "ms, and did not find anything with a computed cost less than " + initCost);
279     }
280     return null;
281   }
282 
283   /**
284    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
285    * state.
286    *
287    * @param cluster The state of the cluster
288    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
289    */
290   private List<RegionPlan> createRegionPlans(Cluster cluster) {
291     List<RegionPlan> plans = new LinkedList<RegionPlan>();
292     for (int regionIndex = 0;
293          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
294       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
295       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
296 
297       if (initialServerIndex != newServerIndex) {
298         HRegionInfo region = cluster.regions[regionIndex];
299         ServerName initialServer = cluster.servers[initialServerIndex];
300         ServerName newServer = cluster.servers[newServerIndex];
301 
302         if (LOG.isTraceEnabled()) {
303           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
304               + initialServer.getHostname() + " to " + newServer.getHostname());
305         }
306         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
307         plans.add(rp);
308       }
309     }
310     return plans;
311   }
312 
313   /**
314    * Store the current region loads.
315    */
316   private synchronized void updateRegionLoad() {
317     // We create a new hashmap so that regions that are no longer there are removed.
318     // However we temporarily need the old loads so we can use them to keep the rolling average.
319     Map<String, Deque<RegionLoad>> oldLoads = loads;
320     loads = new HashMap<String, Deque<RegionLoad>>();
321 
322     for (ServerName sn : clusterStatus.getServers()) {
323       ServerLoad sl = clusterStatus.getLoad(sn);
324       if (sl == null) {
325         continue;
326       }
327       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
328         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
329         if (rLoads == null) {
330           // There was nothing there
331           rLoads = new ArrayDeque<RegionLoad>();
332         } else if (rLoads.size() >= 15) {
333           rLoads.remove();
334         }
335         rLoads.add(entry.getValue());
336         loads.put(Bytes.toString(entry.getKey()), rLoads);
337 
338       }
339     }
340 
341     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
342       cost.setLoads(loads);
343     }
344   }
345 
346 
347   /**
348    * This is the main cost function.  It will compute a cost associated with a proposed cluster
349    * state.  All different costs will be combined with their multipliers to produce a double cost.
350    *
351    * @param cluster The state of the cluster
352    * @param previousCost the previous cost. This is used as an early out.
353    * @return a double of a cost associated with the proposed cluster state.  This cost is an
354    *         aggregate of all individual cost functions.
355    */
356   protected double computeCost(Cluster cluster, double previousCost) {
357     double total = 0;
358 
359     for (CostFunction c:costFunctions) {
360       if (c.getMultiplier() <= 0) {
361         continue;
362       }
363 
364       total += c.getMultiplier() * c.cost(cluster);
365 
366       if (total > previousCost) {
367         return total;
368       }
369     }
370     return total;
371   }
372 
373   abstract static class RegionPicker {
374     abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
375 
376     /**
377      * From a list of regions pick a random one. Null can be returned which
378      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
379      * rather than swap.
380      *
381      * @param cluster        The state of the cluster
382      * @param server         index of the server
383      * @param chanceOfNoSwap Chance that this will decide to try a move rather
384      *                       than a swap.
385      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
386      *         suggested.
387      */
388     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
389       // Check to see if this is just a move.
390       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
391         // signal a move only.
392         return -1;
393       }
394       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
395       return cluster.regionsPerServer[server][rand];
396 
397     }
398     protected int pickRandomServer(Cluster cluster) {
399       if (cluster.numServers < 1) {
400         return -1;
401       }
402 
403       return RANDOM.nextInt(cluster.numServers);
404     }
405     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
406       if (cluster.numServers < 2) {
407         return -1;
408       }
409       while (true) {
410         int otherServerIndex = pickRandomServer(cluster);
411         if (otherServerIndex != serverIndex) {
412           return otherServerIndex;
413         }
414       }
415     }
416 
417     protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
418                                                        int thisServer,
419                                                        int otherServer) {
420       if (thisServer < 0 || otherServer < 0) {
421         return new Pair<Integer, Integer>(-1, -1);
422       }
423 
424       // Decide who is most likely to need another region
425       int thisRegionCount = cluster.getNumRegions(thisServer);
426       int otherRegionCount = cluster.getNumRegions(otherServer);
427 
428       // Assign the chance based upon the above
429       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
430       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
431 
432       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
433       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
434 
435       return new Pair<Integer, Integer>(thisRegion, otherRegion);
436     }
437   }
438 
439   static class RandomRegionPicker extends RegionPicker {
440 
441     @Override
442     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
443 
444       int thisServer = pickRandomServer(cluster);
445 
446       // Pick the other server
447       int otherServer = pickOtherRandomServer(cluster, thisServer);
448 
449       Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
450 
451       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
452           new Pair<Integer, Integer>(thisServer, regions.getFirst()),
453           new Pair<Integer, Integer>(otherServer, regions.getSecond())
454 
455       );
456     }
457 
458   }
459 
460   public static class LoadPicker extends RegionPicker {
461 
462     @Override
463     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
464       cluster.sortServersByRegionCount();
465       int thisServer = pickMostLoadedServer(cluster, -1);
466       int otherServer = pickLeastLoadedServer(cluster, thisServer);
467 
468       Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
469       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
470           new Pair<Integer, Integer>(thisServer, regions.getFirst()),
471           new Pair<Integer, Integer>(otherServer, regions.getSecond())
472 
473       );
474     }
475 
476     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
477       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
478 
479       int index = 0;
480       while (servers[index] == null || servers[index] == thisServer) {
481         index++;
482         if (index == servers.length) {
483           return -1;
484         }
485       }
486       return servers[index];
487     }
488 
489     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
490       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
491 
492       int index = servers.length - 1;
493       while (servers[index] == null || servers[index] == thisServer) {
494         index--;
495         if (index < 0) {
496           return -1;
497         }
498       }
499       return servers[index];
500     }
501   }
502 
503   static class LocalityBasedPicker extends RegionPicker {
504 
505     private MasterServices masterServices;
506 
507     LocalityBasedPicker(MasterServices masterServices) {
508       this.masterServices = masterServices;
509     }
510 
511     @Override
512     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
513       if (this.masterServices == null) {
514         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
515             new Pair<Integer, Integer>(-1,-1),
516             new Pair<Integer, Integer>(-1,-1)
517         );
518       }
519       // Pick a random region server
520       int thisServer = pickRandomServer(cluster);
521 
522       // Pick a random region on this server
523       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
524 
525       if (thisRegion == -1) {
526         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
527             new Pair<Integer, Integer>(-1,-1),
528             new Pair<Integer, Integer>(-1,-1)
529         );
530       }
531 
532       // Pick the server with the highest locality
533       int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
534 
535       // pick an region on the other server to potentially swap
536       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
537 
538       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
539           new Pair<Integer, Integer>(thisServer,thisRegion),
540           new Pair<Integer, Integer>(otherServer,otherRegion)
541       );
542     }
543 
544     private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
545       int[] regionLocations = cluster.regionLocations[thisRegion];
546 
547       if (regionLocations == null || regionLocations.length <= 1) {
548         return pickOtherRandomServer(cluster, thisServer);
549       }
550 
551       for (int loc : regionLocations) {
552         if (loc >= 0 && loc != thisServer) { // find the first suitable server
553           return loc;
554         }
555       }
556 
557       // no location found
558       return pickOtherRandomServer(cluster, thisServer);
559     }
560 
561     void setServices(MasterServices services) {
562       this.masterServices = services;
563     }
564   }
565 
566   /**
567    * Base class of StochasticLoadBalancer's Cost Functions.
568    */
569   public abstract static class CostFunction {
570 
571     private float multiplier = 0;
572 
573     CostFunction(Configuration c) {
574 
575     }
576 
577     float getMultiplier() {
578       return multiplier;
579     }
580 
581     void setMultiplier(float m) {
582       this.multiplier = m;
583     }
584 
585     abstract double cost(Cluster cluster);
586 
587     /**
588      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
589      * assumes that this is a zero sum set of costs.  It assumes that the worst case
590      * possible is all of the elements in one region server and the rest having 0.
591      *
592      * @param stats the costs
593      * @return a scaled set of costs.
594      */
595     protected double costFromArray(double[] stats) {
596       double totalCost = 0;
597       double total = getSum(stats);
598       double mean = total/((double)stats.length);
599       double count = stats.length;
600 
601       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
602       // a zero sum cost for this to make sense.
603       // TODO: Should we make this sum of square errors?
604       double max = ((count - 1) * mean) + (total - mean);
605       for (double n : stats) {
606         double diff = Math.abs(mean - n);
607         totalCost += diff;
608       }
609 
610       double scaled =  scale(0, max, totalCost);
611       return scaled;
612     }
613 
614 
615 
616     private double getSum(double[] stats) {
617       double total = 0;
618       for(double s:stats) {
619         total += s;
620       }
621       return total;
622     }
623 
624     /**
625      * Scale the value between 0 and 1.
626      *
627      * @param min   Min value
628      * @param max   The Max value
629      * @param value The value to be scaled.
630      * @return The scaled value.
631      */
632     protected double scale(double min, double max, double value) {
633       if (max <= min || value <= min) {
634         return 0;
635       }
636 
637       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
638     }
639   }
640 
641   /**
642    * Given the starting state of the regions and a potential ending state
643    * compute cost based upon the number of regions that have moved.
644    */
645   public static class MoveCostFunction extends CostFunction {
646     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
647     private static final String MAX_MOVES_PERCENT_KEY =
648         "hbase.master.balancer.stochastic.maxMovePercent";
649     private static final float DEFAULT_MOVE_COST = 100;
650     private static final int DEFAULT_MAX_MOVES = 600;
651     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
652 
653     private final float maxMovesPercent;
654 
655     MoveCostFunction(Configuration conf) {
656       super(conf);
657 
658       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
659       // that large benefits are need to overcome the cost of a move.
660       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
661       // What percent of the number of regions a single run of the balancer can move.
662       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
663     }
664 
665     @Override
666     double cost(Cluster cluster) {
667       // Try and size the max number of Moves, but always be prepared to move some.
668       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
669           DEFAULT_MAX_MOVES);
670 
671       double moveCost = cluster.numMovedRegions;
672 
673       // Don't let this single balance move more than the max moves,
674       // or move a region that should be on master away from the master.
675       // It is ok to move any master hosted region back to the master.
676       // This allows better scaling to accurately represent the actual cost of a move.
677       if (moveCost > maxMoves || cluster.numMovedMasterHostedRegions > 0) {
678         return 1000000;   // return a number much greater than any of the other cost
679       }
680 
681       return scale(0, cluster.numRegions, moveCost);
682     }
683   }
684 
685   /**
686    * Compute the cost of a potential cluster state from skew in number of
687    * regions on a cluster.
688    */
689   public static class RegionCountSkewCostFunction extends CostFunction {
690     private static final String REGION_COUNT_SKEW_COST_KEY =
691         "hbase.master.balancer.stochastic.regionCountCost";
692     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
693 
694     private double activeMasterWeight;
695     private double backupMasterWeight;
696     private double[] stats = null;
697 
698     RegionCountSkewCostFunction(Configuration conf,
699         double activeMasterWeight, double backupMasterWeight) {
700       super(conf);
701       // Load multiplier should be the greatest as it is the most general way to balance data.
702       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
703       this.activeMasterWeight = activeMasterWeight;
704       this.backupMasterWeight = backupMasterWeight;
705     }
706 
707     @Override
708     double cost(Cluster cluster) {
709       if (stats == null || stats.length != cluster.numServers) {
710         stats = new double[cluster.numServers];
711       }
712 
713       for (int i =0; i < cluster.numServers; i++) {
714         stats[i] = cluster.regionsPerServer[i].length;
715         // Use some weight on regions assigned to active/backup masters,
716         // so that they won't carry as many regions as normal regionservers.
717         if (cluster.isActiveMaster(i)) {
718           stats[i] += cluster.numUserRegionsOnMaster * (activeMasterWeight - 1);
719         } else if (cluster.isBackupMaster(i)) {
720           stats[i] *= backupMasterWeight;
721         }
722       }
723       return costFromArray(stats);
724     }
725   }
726 
727   /**
728    * Compute the cost of a potential cluster configuration based upon how evenly
729    * distributed tables are.
730    */
731   public static class TableSkewCostFunction extends CostFunction {
732 
733     private static final String TABLE_SKEW_COST_KEY =
734         "hbase.master.balancer.stochastic.tableSkewCost";
735     private static final float DEFAULT_TABLE_SKEW_COST = 35;
736 
737     TableSkewCostFunction(Configuration conf) {
738       super(conf);
739       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
740     }
741 
742     @Override
743     double cost(Cluster cluster) {
744       double max = cluster.numRegions;
745       double min = ((double) cluster.numRegions) / cluster.numServers;
746       double value = 0;
747 
748       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
749         value += cluster.numMaxRegionsPerTable[i];
750       }
751 
752       return scale(min, max, value);
753     }
754   }
755 
756   /**
757    * Compute a cost of a potential cluster configuration based upon where
758    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
759    */
760   public static class LocalityCostFunction extends CostFunction {
761 
762     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
763     private static final float DEFAULT_LOCALITY_COST = 25;
764 
765     private MasterServices services;
766 
767     LocalityCostFunction(Configuration conf, MasterServices srv) {
768       super(conf);
769       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
770       this.services = srv;
771     }
772 
773     void setServices(MasterServices srvc) {
774       this.services = srvc;
775     }
776 
777     @Override
778     double cost(Cluster cluster) {
779       double max = 0;
780       double cost = 0;
781 
782       // If there's no master so there's no way anything else works.
783       if (this.services == null) {
784         return cost;
785       }
786 
787       for (int i = 0; i < cluster.regionLocations.length; i++) {
788         max += 1;
789         int serverIndex = cluster.regionIndexToServerIndex[i];
790         int[] regionLocations = cluster.regionLocations[i];
791 
792         // If we can't find where the data is getTopBlock returns null.
793         // so count that as being the best possible.
794         if (regionLocations == null) {
795           continue;
796         }
797 
798         int index = -1;
799         for (int j = 0; j < regionLocations.length; j++) {
800           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
801             index = j;
802             break;
803           }
804         }
805 
806         if (index < 0) {
807           cost += 1;
808         } else {
809           cost += (double) index / (double) regionLocations.length;
810         }
811       }
812       return scale(0, max, cost);
813     }
814   }
815 
816   /**
817    * Base class the allows writing costs functions from rolling average of some
818    * number from RegionLoad.
819    */
820   public abstract static class CostFromRegionLoadFunction extends CostFunction {
821 
822     private ClusterStatus clusterStatus = null;
823     private Map<String, Deque<RegionLoad>> loads = null;
824     private double[] stats = null;
825     CostFromRegionLoadFunction(Configuration conf) {
826       super(conf);
827     }
828 
829     void setClusterStatus(ClusterStatus status) {
830       this.clusterStatus = status;
831     }
832 
833     void setLoads(Map<String, Deque<RegionLoad>> l) {
834       this.loads = l;
835     }
836 
837 
838     @Override
839     double cost(Cluster cluster) {
840       if (clusterStatus == null || loads == null) {
841         return 0;
842       }
843 
844       if (stats == null || stats.length != cluster.numServers) {
845         stats = new double[cluster.numServers];
846       }
847 
848       for (int i =0; i < stats.length; i++) {
849         //Cost this server has from RegionLoad
850         long cost = 0;
851 
852         // for every region on this server get the rl
853         for(int regionIndex:cluster.regionsPerServer[i]) {
854           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
855 
856           // Now if we found a region load get the type of cost that was requested.
857           if (regionLoadList != null) {
858             cost += getRegionLoadCost(regionLoadList);
859           }
860         }
861 
862         // Add the total cost to the stats.
863         stats[i] = cost;
864       }
865 
866       // Now return the scaled cost from data held in the stats object.
867       return costFromArray(stats);
868     }
869 
870     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
871       double cost = 0;
872 
873       for (RegionLoad rl : regionLoadList) {
874         double toAdd = getCostFromRl(rl);
875 
876         if (cost == 0) {
877           cost = toAdd;
878         } else {
879           cost = (.5 * cost) + (.5 * toAdd);
880         }
881       }
882 
883       return cost;
884     }
885 
886     protected abstract double getCostFromRl(RegionLoad rl);
887   }
888 
889   /**
890    * Compute the cost of total number of read requests  The more unbalanced the higher the
891    * computed cost will be.  This uses a rolling average of regionload.
892    */
893 
894   public static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
895 
896     private static final String READ_REQUEST_COST_KEY =
897         "hbase.master.balancer.stochastic.readRequestCost";
898     private static final float DEFAULT_READ_REQUEST_COST = 5;
899 
900     ReadRequestCostFunction(Configuration conf) {
901       super(conf);
902       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
903     }
904 
905 
906     @Override
907     protected double getCostFromRl(RegionLoad rl) {
908       return rl.getReadRequestsCount();
909     }
910   }
911 
912   /**
913    * Compute the cost of total number of write requests.  The more unbalanced the higher the
914    * computed cost will be.  This uses a rolling average of regionload.
915    */
916   public static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
917 
918     private static final String WRITE_REQUEST_COST_KEY =
919         "hbase.master.balancer.stochastic.writeRequestCost";
920     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
921 
922     WriteRequestCostFunction(Configuration conf) {
923       super(conf);
924       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
925     }
926 
927     @Override
928     protected double getCostFromRl(RegionLoad rl) {
929       return rl.getWriteRequestsCount();
930     }
931   }
932 
933   /**
934    * Compute the cost of total memstore size.  The more unbalanced the higher the
935    * computed cost will be.  This uses a rolling average of regionload.
936    */
937   public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
938 
939     private static final String MEMSTORE_SIZE_COST_KEY =
940         "hbase.master.balancer.stochastic.memstoreSizeCost";
941     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
942 
943     MemstoreSizeCostFunction(Configuration conf) {
944       super(conf);
945       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
946     }
947 
948     @Override
949     protected double getCostFromRl(RegionLoad rl) {
950       return rl.getMemStoreSizeMB();
951     }
952   }
953   /**
954    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
955    * computed cost will be.  This uses a rolling average of regionload.
956    */
957   public static class StoreFileCostFunction extends CostFromRegionLoadFunction {
958 
959     private static final String STOREFILE_SIZE_COST_KEY =
960         "hbase.master.balancer.stochastic.storefileSizeCost";
961     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
962 
963     StoreFileCostFunction(Configuration conf) {
964       super(conf);
965       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
966     }
967 
968     @Override
969     protected double getCostFromRl(RegionLoad rl) {
970       return rl.getStorefileSizeMB();
971     }
972   }
973 }