View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Random;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLoad;
40  import org.apache.hadoop.hbase.ServerLoad;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.master.MasterServices;
44  import org.apache.hadoop.hbase.master.RegionPlan;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
48  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
49  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  
53  /**
54   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
55   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
56   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
57   * <ul>
58   * <li>Region Load</li>
59   * <li>Table Load</li>
60   * <li>Data Locality</li>
61   * <li>Memstore Sizes</li>
62   * <li>Storefile Sizes</li>
63   * </ul>
64   *
65   *
66   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
67   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
68   * scaled by their respective multipliers:</p>
69   *
70   * <ul>
71   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
72   *   <li>hbase.master.balancer.stochastic.moveCost</li>
73   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
74   *   <li>hbase.master.balancer.stochastic.localityCost</li>
75   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
76   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
77   * </ul>
78   *
79   * <p>In addition to the above configurations, the balancer can be tuned by the following
80   * configuration values:</p>
81   * <ul>
82   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
83   *   controls what the max number of regions that can be moved in a single invocation of this
84   *   balancer.</li>
85   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
86   *   regions is multiplied to try and get the number of times the balancer will
87   *   mutate all servers.</li>
88   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
89   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
90   *   value and the above computation.</li>
91   * </ul>
92   *
93   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
94   * so that the balancer gets the full picture of all loads on the cluster.</p>
95   */
96  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
97  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
98    justification="Complaint is about costFunctions not being synchronized; not end of the world")
99  public class StochasticLoadBalancer extends BaseLoadBalancer {
100 
101   protected static final String STEPS_PER_REGION_KEY =
102       "hbase.master.balancer.stochastic.stepsPerRegion";
103   protected static final String MAX_STEPS_KEY =
104       "hbase.master.balancer.stochastic.maxSteps";
105   protected static final String MAX_RUNNING_TIME_KEY =
106       "hbase.master.balancer.stochastic.maxRunningTime";
107   protected static final String KEEP_REGION_LOADS =
108       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
109   private static final String TABLE_FUNCTION_SEP = "_";
110 
111   private static final Random RANDOM = new Random(System.currentTimeMillis());
112   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
113 
114   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
115 
116   // values are defaults
117   private int maxSteps = 1000000;
118   private int stepsPerRegion = 800;
119   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
120   private int numRegionLoadsToRemember = 15;
121 
122   private CandidateGenerator[] candidateGenerators;
123   private CostFromRegionLoadFunction[] regionLoadFunctions;
124   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
125 
126   // to save and report costs to JMX
127   private Double curOverallCost = 0d;
128   private Double[] tempFunctionCosts;
129   private Double[] curFunctionCosts;
130 
131   // Keep locality based picker and cost function to alert them
132   // when new services are offered
133   private LocalityBasedCandidateGenerator localityCandidateGenerator;
134   private LocalityCostFunction localityCost;
135   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
136   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
137   private boolean isByTable = false;
138   private TableName tableName = null;
139   
140   /**
141    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
142    * default MetricsBalancer
143    */
144   public StochasticLoadBalancer() {
145     super(new MetricsStochasticBalancer());
146   }
147 
148   @Override
149   public void onConfigurationChange(Configuration conf) {
150     setConf(conf);
151   }
152 
153   @Override
154   public synchronized void setConf(Configuration conf) {
155     super.setConf(conf);
156     LOG.info("loading config");
157 
158     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
159 
160     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
161     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
162 
163     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
164     isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
165 
166     if (localityCandidateGenerator == null) {
167       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
168     }
169     localityCost = new LocalityCostFunction(conf, services);
170 
171     if (candidateGenerators == null) {
172       candidateGenerators = new CandidateGenerator[] {
173           new RandomCandidateGenerator(),
174           new LoadCandidateGenerator(),
175           localityCandidateGenerator,
176           new RegionReplicaRackCandidateGenerator(),
177       };
178     }
179 
180     regionLoadFunctions = new CostFromRegionLoadFunction[] {
181       new ReadRequestCostFunction(conf),
182       new WriteRequestCostFunction(conf),
183       new MemstoreSizeCostFunction(conf),
184       new StoreFileCostFunction(conf)
185     };
186 
187     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
188     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
189 
190     costFunctions = new CostFunction[]{
191       new RegionCountSkewCostFunction(conf),
192       new PrimaryRegionCountSkewCostFunction(conf),
193       new MoveCostFunction(conf),
194       localityCost,
195       new TableSkewCostFunction(conf),
196       regionReplicaHostCostFunction,
197       regionReplicaRackCostFunction,
198       regionLoadFunctions[0],
199       regionLoadFunctions[1],
200       regionLoadFunctions[2],
201       regionLoadFunctions[3],
202     };
203     
204     curFunctionCosts= new Double[costFunctions.length];
205     tempFunctionCosts= new Double[costFunctions.length];
206 
207   }
208 
209   @Override
210   protected void setSlop(Configuration conf) {
211     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
212   }
213 
214   @Override
215   public synchronized void setClusterStatus(ClusterStatus st) {
216     super.setClusterStatus(st);
217     updateRegionLoad();
218     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
219       cost.setClusterStatus(st);
220     }
221 
222     // update metrics size
223     try {
224       // by-table or ensemble mode
225       int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
226       int functionsCount = getCostFunctionNames().length;
227 
228       updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
229     } catch (Exception e) {
230       LOG.error("failed to get the size of all tables, exception = " + e.getMessage());
231     }
232   }
233   
234   /**
235    * Update the number of metrics that are reported to JMX
236    */
237   public void updateMetricsSize(int size) {
238     if (metricsBalancer instanceof MetricsStochasticBalancer) {
239         ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
240     }
241   }
242 
243   @Override
244   public synchronized void setMasterServices(MasterServices masterServices) {
245     super.setMasterServices(masterServices);
246     this.localityCost.setServices(masterServices);
247     this.localityCandidateGenerator.setServices(masterServices);
248 
249   }
250 
251   @Override
252   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
253     regionReplicaHostCostFunction.init(c);
254     if (regionReplicaHostCostFunction.cost() > 0) return true;
255     regionReplicaRackCostFunction.init(c);
256     if (regionReplicaRackCostFunction.cost() > 0) return true;
257     return false;
258   }
259 
260   @Override
261   public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
262     List<HRegionInfo>> clusterState) {
263     this.tableName = tableName;
264     return balanceCluster(clusterState);
265   }
266   
267   /**
268    * Given the cluster state this will try and approach an optimal balance. This
269    * should always approach the optimal state given enough steps.
270    */
271   @Override
272   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
273     List<HRegionInfo>> clusterState) {
274     List<RegionPlan> plans = balanceMasterRegions(clusterState);
275     if (plans != null || clusterState == null || clusterState.size() <= 1) {
276       return plans;
277     }
278 
279     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
280       if (clusterState.size() <= 2) {
281         return null;
282       }
283       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
284       clusterState.remove(masterServerName);
285     }
286 
287     // On clusters with lots of HFileLinks or lots of reference files,
288     // instantiating the storefile infos can be quite expensive.
289     // Allow turning this feature off if the locality cost is not going to
290     // be used in any computations.
291     RegionLocationFinder finder = null;
292     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
293       finder = this.regionFinder;
294     }
295 
296     //The clusterState that is given to this method contains the state
297     //of all the regions in the table(s) (that's true today)
298     // Keep track of servers to iterate through them.
299     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
300 
301     if (!needsBalance(cluster)) {
302       return null;
303     }
304 
305     long startTime = EnvironmentEdgeManager.currentTime();
306 
307     initCosts(cluster);
308 
309     double currentCost = computeCost(cluster, Double.MAX_VALUE);
310     curOverallCost = currentCost;
311     for (int i = 0; i < this.curFunctionCosts.length; i++) {
312       curFunctionCosts[i] = tempFunctionCosts[i];
313     }
314 
315     double initCost = currentCost;
316     double newCost = currentCost;
317 
318     long computedMaxSteps = Math.min(this.maxSteps,
319         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
320     // Perform a stochastic walk to see if we can get a good fit.
321     long step;
322 
323     for (step = 0; step < computedMaxSteps; step++) {
324       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
325       CandidateGenerator p = candidateGenerators[generatorIdx];
326       Cluster.Action action = p.generate(cluster);
327 
328       if (action.type == Type.NULL) {
329         continue;
330       }
331 
332       cluster.doAction(action);
333       updateCostsWithAction(cluster, action);
334 
335       newCost = computeCost(cluster, currentCost);
336 
337       // Should this be kept?
338       if (newCost < currentCost) {
339         currentCost = newCost;
340 
341         // save for JMX
342         curOverallCost = currentCost;
343         for (int i = 0; i < this.curFunctionCosts.length; i++) {
344           curFunctionCosts[i] = tempFunctionCosts[i];
345         }
346       } else {
347         // Put things back the way they were before.
348         // TODO: undo by remembering old values
349         Action undoAction = action.undoAction();
350         cluster.doAction(undoAction);
351         updateCostsWithAction(cluster, undoAction);
352       }
353 
354       if (EnvironmentEdgeManager.currentTime() - startTime >
355           maxRunningTime) {
356         break;
357       }
358     }
359     long endTime = EnvironmentEdgeManager.currentTime();
360 
361     metricsBalancer.balanceCluster(endTime - startTime);
362 
363     // update costs metrics
364     updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
365     if (initCost > currentCost) {
366       plans = createRegionPlans(cluster);
367       if (LOG.isDebugEnabled()) {
368         LOG.debug("Finished computing new load balance plan.  Computation took "
369             + (endTime - startTime) + "ms to try " + step
370             + " different iterations.  Found a solution that moves "
371             + plans.size() + " regions; Going from a computed cost of "
372             + initCost + " to a new cost of " + currentCost);
373       }
374       
375       return plans;
376     }
377     if (LOG.isDebugEnabled()) {
378       LOG.debug("Could not find a better load balance plan.  Tried "
379           + step + " different configurations in " + (endTime - startTime)
380           + "ms, and did not find anything with a computed cost less than " + initCost);
381     }
382     return null;
383   }
384   
385   /**
386    * update costs to JMX
387    */
388   private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
389     if (tableName == null) return;
390 
391     // check if the metricsBalancer is MetricsStochasticBalancer before casting
392     if (metricsBalancer instanceof MetricsStochasticBalancer) {
393       MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
394       // overall cost
395       balancer.updateStochasticCost(tableName.getNameAsString(), 
396         "Overall", "Overall cost", overall);
397 
398       // each cost function
399       for (int i = 0; i < costFunctions.length; i++) {
400         CostFunction costFunction = costFunctions[i];
401         String costFunctionName = costFunction.getClass().getSimpleName();
402         Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
403         // TODO: cost function may need a specific description
404         balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
405           "The percent of " + costFunctionName, costPercent);
406       }
407     }
408   }
409 
410 
411   /**
412    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
413    * state.
414    *
415    * @param cluster The state of the cluster
416    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
417    */
418   private List<RegionPlan> createRegionPlans(Cluster cluster) {
419     List<RegionPlan> plans = new LinkedList<RegionPlan>();
420     for (int regionIndex = 0;
421          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
422       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
423       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
424 
425       if (initialServerIndex != newServerIndex) {
426         HRegionInfo region = cluster.regions[regionIndex];
427         ServerName initialServer = cluster.servers[initialServerIndex];
428         ServerName newServer = cluster.servers[newServerIndex];
429 
430         if (LOG.isTraceEnabled()) {
431           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
432               + initialServer.getHostname() + " to " + newServer.getHostname());
433         }
434         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
435         plans.add(rp);
436       }
437     }
438     return plans;
439   }
440 
441   /**
442    * Store the current region loads.
443    */
444   private synchronized void updateRegionLoad() {
445     // We create a new hashmap so that regions that are no longer there are removed.
446     // However we temporarily need the old loads so we can use them to keep the rolling average.
447     Map<String, Deque<RegionLoad>> oldLoads = loads;
448     loads = new HashMap<String, Deque<RegionLoad>>();
449 
450     for (ServerName sn : clusterStatus.getServers()) {
451       ServerLoad sl = clusterStatus.getLoad(sn);
452       if (sl == null) {
453         continue;
454       }
455       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
456         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
457         if (rLoads == null) {
458           // There was nothing there
459           rLoads = new ArrayDeque<RegionLoad>();
460         } else if (rLoads.size() >= numRegionLoadsToRemember) {
461           rLoads.remove();
462         }
463         rLoads.add(entry.getValue());
464         loads.put(Bytes.toString(entry.getKey()), rLoads);
465 
466       }
467     }
468 
469     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
470       cost.setLoads(loads);
471     }
472   }
473 
474   protected void initCosts(Cluster cluster) {
475     for (CostFunction c:costFunctions) {
476       c.init(cluster);
477     }
478   }
479 
480   protected void updateCostsWithAction(Cluster cluster, Action action) {
481     for (CostFunction c : costFunctions) {
482       c.postAction(action);
483     }
484   }
485 
486   /**
487    * Get the names of the cost functions
488    */
489   public String[] getCostFunctionNames() {
490     if (costFunctions == null) return null;
491     String[] ret = new String[costFunctions.length];
492     for (int i = 0; i < costFunctions.length; i++) {
493       CostFunction c = costFunctions[i];
494       ret[i] = c.getClass().getSimpleName();
495     }
496 
497     return ret;
498   }
499 
500   /**
501    * This is the main cost function.  It will compute a cost associated with a proposed cluster
502    * state.  All different costs will be combined with their multipliers to produce a double cost.
503    *
504    * @param cluster The state of the cluster
505    * @param previousCost the previous cost. This is used as an early out.
506    * @return a double of a cost associated with the proposed cluster state.  This cost is an
507    *         aggregate of all individual cost functions.
508    */
509   protected double computeCost(Cluster cluster, double previousCost) {
510     double total = 0;
511 
512     for (int i = 0; i < costFunctions.length; i++) {
513       CostFunction c = costFunctions[i];
514       this.tempFunctionCosts[i] = 0.0;
515       
516       if (c.getMultiplier() <= 0) {
517         continue;
518       }
519 
520       Float multiplier = c.getMultiplier();
521       Double cost = c.cost();
522 
523       this.tempFunctionCosts[i] = multiplier*cost;
524       total += this.tempFunctionCosts[i];
525 
526       if (total > previousCost) {
527         break;
528       }
529     }
530    
531     return total;
532   }
533 
534   /** Generates a candidate action to be applied to the cluster for cost function search */
535   abstract static class CandidateGenerator {
536     abstract Cluster.Action generate(Cluster cluster);
537 
538     /**
539      * From a list of regions pick a random one. Null can be returned which
540      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
541      * rather than swap.
542      *
543      * @param cluster        The state of the cluster
544      * @param server         index of the server
545      * @param chanceOfNoSwap Chance that this will decide to try a move rather
546      *                       than a swap.
547      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
548      *         suggested.
549      */
550     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
551       // Check to see if this is just a move.
552       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
553         // signal a move only.
554         return -1;
555       }
556       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
557       return cluster.regionsPerServer[server][rand];
558 
559     }
560     protected int pickRandomServer(Cluster cluster) {
561       if (cluster.numServers < 1) {
562         return -1;
563       }
564 
565       return RANDOM.nextInt(cluster.numServers);
566     }
567 
568     protected int pickRandomRack(Cluster cluster) {
569       if (cluster.numRacks < 1) {
570         return -1;
571       }
572 
573       return RANDOM.nextInt(cluster.numRacks);
574     }
575 
576     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
577       if (cluster.numServers < 2) {
578         return -1;
579       }
580       while (true) {
581         int otherServerIndex = pickRandomServer(cluster);
582         if (otherServerIndex != serverIndex) {
583           return otherServerIndex;
584         }
585       }
586     }
587 
588     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
589       if (cluster.numRacks < 2) {
590         return -1;
591       }
592       while (true) {
593         int otherRackIndex = pickRandomRack(cluster);
594         if (otherRackIndex != rackIndex) {
595           return otherRackIndex;
596         }
597       }
598     }
599 
600     protected Cluster.Action pickRandomRegions(Cluster cluster,
601                                                        int thisServer,
602                                                        int otherServer) {
603       if (thisServer < 0 || otherServer < 0) {
604         return Cluster.NullAction;
605       }
606 
607       // Decide who is most likely to need another region
608       int thisRegionCount = cluster.getNumRegions(thisServer);
609       int otherRegionCount = cluster.getNumRegions(otherServer);
610 
611       // Assign the chance based upon the above
612       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
613       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
614 
615       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
616       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
617 
618       return getAction(thisServer, thisRegion, otherServer, otherRegion);
619     }
620 
621     protected Cluster.Action getAction(int fromServer, int fromRegion,
622         int toServer, int toRegion) {
623       if (fromServer < 0 || toServer < 0) {
624         return Cluster.NullAction;
625       }
626       if (fromRegion > 0 && toRegion > 0) {
627         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
628           toServer, toRegion);
629       } else if (fromRegion > 0) {
630         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
631       } else if (toRegion > 0) {
632         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
633       } else {
634         return Cluster.NullAction;
635       }
636     }
637   }
638 
639   static class RandomCandidateGenerator extends CandidateGenerator {
640 
641     @Override
642     Cluster.Action generate(Cluster cluster) {
643 
644       int thisServer = pickRandomServer(cluster);
645 
646       // Pick the other server
647       int otherServer = pickOtherRandomServer(cluster, thisServer);
648 
649       return pickRandomRegions(cluster, thisServer, otherServer);
650     }
651   }
652 
653   static class LoadCandidateGenerator extends CandidateGenerator {
654 
655     @Override
656     Cluster.Action generate(Cluster cluster) {
657       cluster.sortServersByRegionCount();
658       int thisServer = pickMostLoadedServer(cluster, -1);
659       int otherServer = pickLeastLoadedServer(cluster, thisServer);
660 
661       return pickRandomRegions(cluster, thisServer, otherServer);
662     }
663 
664     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
665       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
666 
667       int index = 0;
668       while (servers[index] == null || servers[index] == thisServer) {
669         index++;
670         if (index == servers.length) {
671           return -1;
672         }
673       }
674       return servers[index];
675     }
676 
677     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
678       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
679 
680       int index = servers.length - 1;
681       while (servers[index] == null || servers[index] == thisServer) {
682         index--;
683         if (index < 0) {
684           return -1;
685         }
686       }
687       return servers[index];
688     }
689   }
690 
691   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
692 
693     private MasterServices masterServices;
694 
695     LocalityBasedCandidateGenerator(MasterServices masterServices) {
696       this.masterServices = masterServices;
697     }
698 
699     @Override
700     Cluster.Action generate(Cluster cluster) {
701       if (this.masterServices == null) {
702         int thisServer = pickRandomServer(cluster);
703         // Pick the other server
704         int otherServer = pickOtherRandomServer(cluster, thisServer);
705         return pickRandomRegions(cluster, thisServer, otherServer);
706       }
707 
708       cluster.calculateRegionServerLocalities();
709       // Pick server with lowest locality
710       int thisServer = pickLowestLocalityServer(cluster);
711       int thisRegion;
712       if (thisServer == -1) {
713         LOG.warn("Could not pick lowest locality region server");
714         return Cluster.NullAction;
715       } else {
716       // Pick lowest locality region on this server
717         thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
718       }
719 
720       if (thisRegion == -1) {
721         return Cluster.NullAction;
722       }
723 
724       // Pick the least loaded server with good locality for the region
725       int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
726 
727       if (otherServer == -1) {
728         return Cluster.NullAction;
729       }
730 
731       // Let the candidate region be moved to its highest locality server.
732       int otherRegion = -1;
733 
734       return getAction(thisServer, thisRegion, otherServer, otherRegion);
735     }
736 
737     private int pickLowestLocalityServer(Cluster cluster) {
738       return cluster.getLowestLocalityRegionServer();
739     }
740 
741     private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
742       return cluster.getLowestLocalityRegionOnServer(server);
743     }
744 
745     void setServices(MasterServices services) {
746       this.masterServices = services;
747     }
748   }
749 
750   /**
751    * Generates candidates which moves the replicas out of the region server for
752    * co-hosted region replicas
753    */
754   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
755 
756     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
757 
758     /**
759      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
760      * (a group is a server, host or rack)
761      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
762      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
763      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
764      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
765      * @return a regionIndex for the selected primary or -1 if there is no co-locating
766      */
767     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
768         , int[] regionIndexToPrimaryIndex) {
769       int currentPrimary = -1;
770       int currentPrimaryIndex = -1;
771       int selectedPrimaryIndex = -1;
772       double currentLargestRandom = -1;
773       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
774       // ids for the regions hosted in server, a consecutive repetition means that replicas
775       // are co-hosted
776       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
777         int primary = j < primariesOfRegionsPerGroup.length
778             ? primariesOfRegionsPerGroup[j] : -1;
779         if (primary != currentPrimary) { // check for whether we see a new primary
780           int numReplicas = j - currentPrimaryIndex;
781           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
782             // decide to select this primary region id or not
783             double currentRandom = RANDOM.nextDouble();
784             // we don't know how many region replicas are co-hosted, we will randomly select one
785             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
786             if (currentRandom > currentLargestRandom) {
787               selectedPrimaryIndex = currentPrimary;
788               currentLargestRandom = currentRandom;
789             }
790           }
791           currentPrimary = primary;
792           currentPrimaryIndex = j;
793         }
794       }
795 
796       // we have found the primary id for the region to move. Now find the actual regionIndex
797       // with the given primary, prefer to move the secondary region.
798       for (int j = 0; j < regionsPerGroup.length; j++) {
799         int regionIndex = regionsPerGroup[j];
800         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
801           // always move the secondary, not the primary
802           if (selectedPrimaryIndex != regionIndex) {
803             return regionIndex;
804           }
805         }
806       }
807       return -1;
808     }
809 
810     @Override
811     Cluster.Action generate(Cluster cluster) {
812       int serverIndex = pickRandomServer(cluster);
813       if (cluster.numServers <= 1 || serverIndex == -1) {
814         return Cluster.NullAction;
815       }
816 
817       int regionIndex = selectCoHostedRegionPerGroup(
818         cluster.primariesOfRegionsPerServer[serverIndex],
819         cluster.regionsPerServer[serverIndex],
820         cluster.regionIndexToPrimaryIndex);
821 
822       // if there are no pairs of region replicas co-hosted, default to random generator
823       if (regionIndex == -1) {
824         // default to randompicker
825         return randomGenerator.generate(cluster);
826       }
827 
828       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
829       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
830       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
831     }
832   }
833 
834   /**
835    * Generates candidates which moves the replicas out of the rack for
836    * co-hosted region replicas in the same rack
837    */
838   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
839     @Override
840     Cluster.Action generate(Cluster cluster) {
841       int rackIndex = pickRandomRack(cluster);
842       if (cluster.numRacks <= 1 || rackIndex == -1) {
843         return super.generate(cluster);
844       }
845 
846       int regionIndex = selectCoHostedRegionPerGroup(
847         cluster.primariesOfRegionsPerRack[rackIndex],
848         cluster.regionsPerRack[rackIndex],
849         cluster.regionIndexToPrimaryIndex);
850 
851       // if there are no pairs of region replicas co-hosted, default to random generator
852       if (regionIndex == -1) {
853         // default to randompicker
854         return randomGenerator.generate(cluster);
855       }
856 
857       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
858       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
859 
860       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
861       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
862       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
863       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
864     }
865   }
866 
867   /**
868    * Base class of StochasticLoadBalancer's Cost Functions.
869    */
870   abstract static class CostFunction {
871 
872     private float multiplier = 0;
873 
874     protected Cluster cluster;
875 
876     CostFunction(Configuration c) {
877 
878     }
879 
880     float getMultiplier() {
881       return multiplier;
882     }
883 
884     void setMultiplier(float m) {
885       this.multiplier = m;
886     }
887 
888     /** Called once per LB invocation to give the cost function
889      * to initialize it's state, and perform any costly calculation.
890      */
891     void init(Cluster cluster) {
892       this.cluster = cluster;
893     }
894 
895     /** Called once per cluster Action to give the cost function
896      * an opportunity to update it's state. postAction() is always
897      * called at least once before cost() is called with the cluster
898      * that this action is performed on. */
899     void postAction(Action action) {
900       switch (action.type) {
901       case NULL: break;
902       case ASSIGN_REGION:
903         AssignRegionAction ar = (AssignRegionAction) action;
904         regionMoved(ar.region, -1, ar.server);
905         break;
906       case MOVE_REGION:
907         MoveRegionAction mra = (MoveRegionAction) action;
908         regionMoved(mra.region, mra.fromServer, mra.toServer);
909         break;
910       case SWAP_REGIONS:
911         SwapRegionsAction a = (SwapRegionsAction) action;
912         regionMoved(a.fromRegion, a.fromServer, a.toServer);
913         regionMoved(a.toRegion, a.toServer, a.fromServer);
914         break;
915       default:
916         throw new RuntimeException("Uknown action:" + action.type);
917       }
918     }
919 
920     protected void regionMoved(int region, int oldServer, int newServer) {
921     }
922 
923     abstract double cost();
924 
925     /**
926      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
927      * assumes that this is a zero sum set of costs.  It assumes that the worst case
928      * possible is all of the elements in one region server and the rest having 0.
929      *
930      * @param stats the costs
931      * @return a scaled set of costs.
932      */
933     protected double costFromArray(double[] stats) {
934       double totalCost = 0;
935       double total = getSum(stats);
936 
937       double count = stats.length;
938       double mean = total/count;
939 
940       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
941       // a zero sum cost for this to make sense.
942       double max = ((count - 1) * mean) + (total - mean);
943 
944       // It's possible that there aren't enough regions to go around
945       double min;
946       if (count > total) {
947         min = ((count - total) * mean) + ((1 - mean) * total);
948       } else {
949         // Some will have 1 more than everything else.
950         int numHigh = (int) (total - (Math.floor(mean) * count));
951         int numLow = (int) (count - numHigh);
952 
953         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
954 
955       }
956       min = Math.max(0, min);
957       for (int i=0; i<stats.length; i++) {
958         double n = stats[i];
959         double diff = Math.abs(mean - n);
960         totalCost += diff;
961       }
962 
963       double scaled =  scale(min, max, totalCost);
964       return scaled;
965     }
966 
967     private double getSum(double[] stats) {
968       double total = 0;
969       for(double s:stats) {
970         total += s;
971       }
972       return total;
973     }
974 
975     /**
976      * Scale the value between 0 and 1.
977      *
978      * @param min   Min value
979      * @param max   The Max value
980      * @param value The value to be scaled.
981      * @return The scaled value.
982      */
983     protected double scale(double min, double max, double value) {
984       if (max <= min || value <= min) {
985         return 0;
986       }
987       if ((max - min) == 0) return 0;
988 
989       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
990     }
991   }
992 
993   /**
994    * Given the starting state of the regions and a potential ending state
995    * compute cost based upon the number of regions that have moved.
996    */
997   static class MoveCostFunction extends CostFunction {
998     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
999     private static final String MAX_MOVES_PERCENT_KEY =
1000         "hbase.master.balancer.stochastic.maxMovePercent";
1001     private static final float DEFAULT_MOVE_COST = 100;
1002     private static final int DEFAULT_MAX_MOVES = 600;
1003     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1004 
1005     private final float maxMovesPercent;
1006 
1007     MoveCostFunction(Configuration conf) {
1008       super(conf);
1009 
1010       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1011       // that large benefits are need to overcome the cost of a move.
1012       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1013       // What percent of the number of regions a single run of the balancer can move.
1014       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1015     }
1016 
1017     @Override
1018     double cost() {
1019       // Try and size the max number of Moves, but always be prepared to move some.
1020       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1021           DEFAULT_MAX_MOVES);
1022 
1023       double moveCost = cluster.numMovedRegions;
1024 
1025       // Don't let this single balance move more than the max moves.
1026       // This allows better scaling to accurately represent the actual cost of a move.
1027       if (moveCost > maxMoves) {
1028         return 1000000;   // return a number much greater than any of the other cost
1029       }
1030 
1031       return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1032     }
1033   }
1034 
1035   /**
1036    * Compute the cost of a potential cluster state from skew in number of
1037    * regions on a cluster.
1038    */
1039   static class RegionCountSkewCostFunction extends CostFunction {
1040     private static final String REGION_COUNT_SKEW_COST_KEY =
1041         "hbase.master.balancer.stochastic.regionCountCost";
1042     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1043 
1044     private double[] stats = null;
1045 
1046     RegionCountSkewCostFunction(Configuration conf) {
1047       super(conf);
1048       // Load multiplier should be the greatest as it is the most general way to balance data.
1049       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1050     }
1051 
1052     @Override
1053     double cost() {
1054       if (stats == null || stats.length != cluster.numServers) {
1055         stats = new double[cluster.numServers];
1056       }
1057 
1058       for (int i =0; i < cluster.numServers; i++) {
1059         stats[i] = cluster.regionsPerServer[i].length;
1060       }
1061 
1062       return costFromArray(stats);
1063     }
1064   }
1065 
1066   /**
1067    * Compute the cost of a potential cluster state from skew in number of
1068    * primary regions on a cluster.
1069    */
1070   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1071     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1072         "hbase.master.balancer.stochastic.primaryRegionCountCost";
1073     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1074 
1075     private double[] stats = null;
1076 
1077     PrimaryRegionCountSkewCostFunction(Configuration conf) {
1078       super(conf);
1079       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1080       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1081         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1082     }
1083 
1084     @Override
1085     double cost() {
1086       if (!cluster.hasRegionReplicas) {
1087         return 0;
1088       }
1089       if (stats == null || stats.length != cluster.numServers) {
1090         stats = new double[cluster.numServers];
1091       }
1092 
1093       for (int i =0; i < cluster.numServers; i++) {
1094         stats[i] = 0;
1095         for (int regionIdx : cluster.regionsPerServer[i]) {
1096           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1097             stats[i] ++;
1098           }
1099         }
1100       }
1101 
1102       return costFromArray(stats);
1103     }
1104   }
1105 
1106   /**
1107    * Compute the cost of a potential cluster configuration based upon how evenly
1108    * distributed tables are.
1109    */
1110   static class TableSkewCostFunction extends CostFunction {
1111 
1112     private static final String TABLE_SKEW_COST_KEY =
1113         "hbase.master.balancer.stochastic.tableSkewCost";
1114     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1115 
1116     TableSkewCostFunction(Configuration conf) {
1117       super(conf);
1118       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1119     }
1120 
1121     @Override
1122     double cost() {
1123       double max = cluster.numRegions;
1124       double min = ((double) cluster.numRegions) / cluster.numServers;
1125       double value = 0;
1126 
1127       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1128         value += cluster.numMaxRegionsPerTable[i];
1129       }
1130 
1131       return scale(min, max, value);
1132     }
1133   }
1134 
1135   /**
1136    * Compute a cost of a potential cluster configuration based upon where
1137    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1138    */
1139   static class LocalityCostFunction extends CostFunction {
1140 
1141     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1142     private static final float DEFAULT_LOCALITY_COST = 25;
1143 
1144     private MasterServices services;
1145 
1146     LocalityCostFunction(Configuration conf, MasterServices srv) {
1147       super(conf);
1148       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1149       this.services = srv;
1150     }
1151 
1152     void setServices(MasterServices srvc) {
1153       this.services = srvc;
1154     }
1155 
1156     @Override
1157     double cost() {
1158       double max = 0;
1159       double cost = 0;
1160 
1161       // If there's no master so there's no way anything else works.
1162       if (this.services == null) {
1163         return cost;
1164       }
1165 
1166       for (int i = 0; i < cluster.regionLocations.length; i++) {
1167         max += 1;
1168         int serverIndex = cluster.regionIndexToServerIndex[i];
1169         int[] regionLocations = cluster.regionLocations[i];
1170 
1171         // If we can't find where the data is getTopBlock returns null.
1172         // so count that as being the best possible.
1173         if (regionLocations == null) {
1174           continue;
1175         }
1176 
1177         int index = -1;
1178         for (int j = 0; j < regionLocations.length; j++) {
1179           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1180             index = j;
1181             break;
1182           }
1183         }
1184 
1185         if (index < 0) {
1186           cost += 1;
1187         } else {
1188           cost += (1 - cluster.getLocalityOfRegion(i, index));
1189         }
1190       }
1191       return scale(0, max, cost);
1192     }
1193   }
1194 
1195   /**
1196    * Base class the allows writing costs functions from rolling average of some
1197    * number from RegionLoad.
1198    */
1199   abstract static class CostFromRegionLoadFunction extends CostFunction {
1200 
1201     private ClusterStatus clusterStatus = null;
1202     private Map<String, Deque<RegionLoad>> loads = null;
1203     private double[] stats = null;
1204     CostFromRegionLoadFunction(Configuration conf) {
1205       super(conf);
1206     }
1207 
1208     void setClusterStatus(ClusterStatus status) {
1209       this.clusterStatus = status;
1210     }
1211 
1212     void setLoads(Map<String, Deque<RegionLoad>> l) {
1213       this.loads = l;
1214     }
1215 
1216     @Override
1217     double cost() {
1218       if (clusterStatus == null || loads == null) {
1219         return 0;
1220       }
1221 
1222       if (stats == null || stats.length != cluster.numServers) {
1223         stats = new double[cluster.numServers];
1224       }
1225 
1226       for (int i =0; i < stats.length; i++) {
1227         //Cost this server has from RegionLoad
1228         long cost = 0;
1229 
1230         // for every region on this server get the rl
1231         for(int regionIndex:cluster.regionsPerServer[i]) {
1232           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1233 
1234           // Now if we found a region load get the type of cost that was requested.
1235           if (regionLoadList != null) {
1236             cost += getRegionLoadCost(regionLoadList);
1237           }
1238         }
1239 
1240         // Add the total cost to the stats.
1241         stats[i] = cost;
1242       }
1243 
1244       // Now return the scaled cost from data held in the stats object.
1245       return costFromArray(stats);
1246     }
1247 
1248     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1249       double cost = 0;
1250 
1251       for (RegionLoad rl : regionLoadList) {
1252         double toAdd = getCostFromRl(rl);
1253 
1254         if (cost == 0) {
1255           cost = toAdd;
1256         } else {
1257           cost = (.5 * cost) + (.5 * toAdd);
1258         }
1259       }
1260 
1261       return cost;
1262     }
1263 
1264     protected abstract double getCostFromRl(RegionLoad rl);
1265   }
1266 
1267   /**
1268    * Compute the cost of total number of read requests  The more unbalanced the higher the
1269    * computed cost will be.  This uses a rolling average of regionload.
1270    */
1271 
1272   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1273 
1274     private static final String READ_REQUEST_COST_KEY =
1275         "hbase.master.balancer.stochastic.readRequestCost";
1276     private static final float DEFAULT_READ_REQUEST_COST = 5;
1277 
1278     ReadRequestCostFunction(Configuration conf) {
1279       super(conf);
1280       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1281     }
1282 
1283 
1284     @Override
1285     protected double getCostFromRl(RegionLoad rl) {
1286       return rl.getReadRequestsCount();
1287     }
1288   }
1289 
1290   /**
1291    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1292    * computed cost will be.  This uses a rolling average of regionload.
1293    */
1294   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1295 
1296     private static final String WRITE_REQUEST_COST_KEY =
1297         "hbase.master.balancer.stochastic.writeRequestCost";
1298     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1299 
1300     WriteRequestCostFunction(Configuration conf) {
1301       super(conf);
1302       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1303     }
1304 
1305     @Override
1306     protected double getCostFromRl(RegionLoad rl) {
1307       return rl.getWriteRequestsCount();
1308     }
1309   }
1310 
1311   /**
1312    * A cost function for region replicas. We give a very high cost to hosting
1313    * replicas of the same region in the same host. We do not prevent the case
1314    * though, since if numReplicas > numRegionServers, we still want to keep the
1315    * replica open.
1316    */
1317   static class RegionReplicaHostCostFunction extends CostFunction {
1318     private static final String REGION_REPLICA_HOST_COST_KEY =
1319         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1320     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1321 
1322     long maxCost = 0;
1323     long[] costsPerGroup; // group is either server, host or rack
1324     int[][] primariesOfRegionsPerGroup;
1325 
1326     public RegionReplicaHostCostFunction(Configuration conf) {
1327       super(conf);
1328       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1329         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1330     }
1331 
1332     @Override
1333     void init(Cluster cluster) {
1334       super.init(cluster);
1335       // max cost is the case where every region replica is hosted together regardless of host
1336       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1337       costsPerGroup = new long[cluster.numHosts];
1338       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1339           ? cluster.primariesOfRegionsPerHost
1340           : cluster.primariesOfRegionsPerServer;
1341       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1342         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1343       }
1344     }
1345 
1346     long getMaxCost(Cluster cluster) {
1347       if (!cluster.hasRegionReplicas) {
1348         return 0; // short circuit
1349       }
1350       // max cost is the case where every region replica is hosted together regardless of host
1351       int[] primariesOfRegions = new int[cluster.numRegions];
1352       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1353           cluster.regions.length);
1354 
1355       Arrays.sort(primariesOfRegions);
1356 
1357       // compute numReplicas from the sorted array
1358       return costPerGroup(primariesOfRegions);
1359     }
1360 
1361     @Override
1362     double cost() {
1363       if (maxCost <= 0) {
1364         return 0;
1365       }
1366 
1367       long totalCost = 0;
1368       for (int i = 0 ; i < costsPerGroup.length; i++) {
1369         totalCost += costsPerGroup[i];
1370       }
1371       return scale(0, maxCost, totalCost);
1372     }
1373 
1374     /**
1375      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1376      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1377      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1378      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1379      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1380      * @return a sum of numReplicas-1 squared for each primary region in the group.
1381      */
1382     protected long costPerGroup(int[] primariesOfRegions) {
1383       long cost = 0;
1384       int currentPrimary = -1;
1385       int currentPrimaryIndex = -1;
1386       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1387       // sharing the same primary will have consecutive numbers in the array.
1388       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1389         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1390         if (primary != currentPrimary) { // we see a new primary
1391           int numReplicas = j - currentPrimaryIndex;
1392           // square the cost
1393           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1394             cost += (numReplicas - 1) * (numReplicas - 1);
1395           }
1396           currentPrimary = primary;
1397           currentPrimaryIndex = j;
1398         }
1399       }
1400 
1401       return cost;
1402     }
1403 
1404     @Override
1405     protected void regionMoved(int region, int oldServer, int newServer) {
1406       if (maxCost <= 0) {
1407         return; // no need to compute
1408       }
1409       if (cluster.multiServersPerHost) {
1410         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1411         int newHost = cluster.serverIndexToHostIndex[newServer];
1412         if (newHost != oldHost) {
1413           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1414           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1415         }
1416       } else {
1417         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1418         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1419       }
1420     }
1421   }
1422 
1423   /**
1424    * A cost function for region replicas for the rack distribution. We give a relatively high
1425    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1426    * though.
1427    */
1428   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1429     private static final String REGION_REPLICA_RACK_COST_KEY =
1430         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1431     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1432 
1433     public RegionReplicaRackCostFunction(Configuration conf) {
1434       super(conf);
1435       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, 
1436         DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1437     }
1438 
1439     @Override
1440     void init(Cluster cluster) {
1441       this.cluster = cluster;
1442       if (cluster.numRacks <= 1) {
1443         maxCost = 0;
1444         return; // disabled for 1 rack
1445       }
1446       // max cost is the case where every region replica is hosted together regardless of rack
1447       maxCost = getMaxCost(cluster);
1448       costsPerGroup = new long[cluster.numRacks];
1449       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1450         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1451       }
1452     }
1453 
1454     @Override
1455     protected void regionMoved(int region, int oldServer, int newServer) {
1456       if (maxCost <= 0) {
1457         return; // no need to compute
1458       }
1459       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1460       int newRack = cluster.serverIndexToRackIndex[newServer];
1461       if (newRack != oldRack) {
1462         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1463         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1464       }
1465     }
1466   }
1467 
1468   /**
1469    * Compute the cost of total memstore size.  The more unbalanced the higher the
1470    * computed cost will be.  This uses a rolling average of regionload.
1471    */
1472   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1473 
1474     private static final String MEMSTORE_SIZE_COST_KEY =
1475         "hbase.master.balancer.stochastic.memstoreSizeCost";
1476     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1477 
1478     MemstoreSizeCostFunction(Configuration conf) {
1479       super(conf);
1480       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1481     }
1482 
1483     @Override
1484     protected double getCostFromRl(RegionLoad rl) {
1485       return rl.getMemStoreSizeMB();
1486     }
1487   }
1488   /**
1489    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1490    * computed cost will be.  This uses a rolling average of regionload.
1491    */
1492   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1493 
1494     private static final String STOREFILE_SIZE_COST_KEY =
1495         "hbase.master.balancer.stochastic.storefileSizeCost";
1496     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1497 
1498     StoreFileCostFunction(Configuration conf) {
1499       super(conf);
1500       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1501     }
1502 
1503     @Override
1504     protected double getCostFromRl(RegionLoad rl) {
1505       return rl.getStorefileSizeMB();
1506     }
1507   }
1508   
1509   /**
1510    * A helper function to compose the attribute name from tablename and costfunction name
1511    */
1512   public static String composeAttributeName(String tableName, String costFunctionName) {
1513     return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1514   }
1515 }