View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.Random;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLoad;
40  import org.apache.hadoop.hbase.ServerLoad;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.master.MasterServices;
44  import org.apache.hadoop.hbase.master.RegionPlan;
45  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
46  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
47  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
48  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
49  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
50  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53
54  /**
55   * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
56   * randomly try and mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the
57   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
58   * <ul>
59   * <li>Region Load</li>
60   * <li>Table Load</li>
61   * <li>Data Locality</li>
62   * <li>Memstore Sizes</li>
63   * <li>Storefile Sizes</li>
64   * </ul>
65   *
66   *
67   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
68   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
69   * scaled by their respective multipliers:</p>
70   *
71   * <ul>
72   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
73   *   <li>hbase.master.balancer.stochastic.moveCost</li>
74   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
75   *   <li>hbase.master.balancer.stochastic.localityCost</li>
76   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
77   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
78   * </ul>
79   *
80   * <p>In addition to the above configurations, the balancer can be tuned by the following
81   * configuration values:</p>
82   * <ul>
83   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
84   *   controls what the max number of regions that can be moved in a single invocation of this
85   *   balancer.</li>
86   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
87   *   regions is multiplied to try and get the number of times the balancer will
88   *   mutate all servers.</li>
89   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
90   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
91   *   value and the above computation.</li>
92   * </ul>
93   *
94   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
95   * so that the balancer gets the full picture of all loads on the cluster.</p>
96   */
97  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
98  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
99    justification="Complaint is about costFunctions not being synchronized; not end of the world")
100 public class StochasticLoadBalancer extends BaseLoadBalancer {
101
102   protected static final String STEPS_PER_REGION_KEY =
103       "hbase.master.balancer.stochastic.stepsPerRegion";
104   protected static final String MAX_STEPS_KEY =
105       "hbase.master.balancer.stochastic.maxSteps";
106   protected static final String MAX_RUNNING_TIME_KEY =
107       "hbase.master.balancer.stochastic.maxRunningTime";
108   protected static final String KEEP_REGION_LOADS =
109       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
110   private static final String TABLE_FUNCTION_SEP = "_";
111   protected static final String MIN_COST_NEED_BALANCE_KEY =
112       "hbase.master.balancer.stochastic.minCostNeedBalance";
113 
114   private static final Random RANDOM = new Random(System.currentTimeMillis());
115   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
116
117   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
118
119   // values are defaults
120   private int maxSteps = 1000000;
121   private int stepsPerRegion = 800;
122   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
123   private int numRegionLoadsToRemember = 15;
124   private float minCostNeedBalance = 0.05f;
125 
126   private CandidateGenerator[] candidateGenerators;
127   private CostFromRegionLoadFunction[] regionLoadFunctions;
128   private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
129
130   // to save and report costs to JMX
131   private Double curOverallCost = 0d;
132   private Double[] tempFunctionCosts;
133   private Double[] curFunctionCosts;
134
135   // Keep locality based picker and cost function to alert them
136   // when new services are offered
137   private LocalityBasedCandidateGenerator localityCandidateGenerator;
138   private LocalityCostFunction localityCost;
139   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
140   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
141   private boolean isByTable = false;
142   private TableName tableName = null;
143
144   /**
145    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
146    * default MetricsBalancer
147    */
148   public StochasticLoadBalancer() {
149     super(new MetricsStochasticBalancer());
150   }
151
152   @Override
153   public void onConfigurationChange(Configuration conf) {
154     setConf(conf);
155   }
156
157   @Override
158   public synchronized void setConf(Configuration conf) {
159     super.setConf(conf);
160     LOG.info("loading config");
161
162     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
163
164     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
165     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
166
167     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
168     isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
169
170     minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
171
172     if (localityCandidateGenerator == null) {
173       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
174     }
175     localityCost = new LocalityCostFunction(conf, services);
176
177     if (candidateGenerators == null) {
178       candidateGenerators = new CandidateGenerator[] {
179           new RandomCandidateGenerator(),
180           new LoadCandidateGenerator(),
181           localityCandidateGenerator,
182           new RegionReplicaRackCandidateGenerator(),
183       };
184     }
185
186     regionLoadFunctions = new CostFromRegionLoadFunction[] {
187       new ReadRequestCostFunction(conf),
188       new WriteRequestCostFunction(conf),
189       new MemstoreSizeCostFunction(conf),
190       new StoreFileCostFunction(conf)
191     };
192
193     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
194     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
195
196     costFunctions = new CostFunction[]{
197       new RegionCountSkewCostFunction(conf),
198       new PrimaryRegionCountSkewCostFunction(conf),
199       new MoveCostFunction(conf),
200       localityCost,
201       new TableSkewCostFunction(conf),
202       regionReplicaHostCostFunction,
203       regionReplicaRackCostFunction,
204       regionLoadFunctions[0],
205       regionLoadFunctions[1],
206       regionLoadFunctions[2],
207       regionLoadFunctions[3],
208     };
209
210     curFunctionCosts= new Double[costFunctions.length];
211     tempFunctionCosts= new Double[costFunctions.length];
212
213   }
214
215   @Override
216   protected void setSlop(Configuration conf) {
217     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
218   }
219
220   @Override
221   public synchronized void setClusterStatus(ClusterStatus st) {
222     super.setClusterStatus(st);
223     updateRegionLoad();
224     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
225       cost.setClusterStatus(st);
226     }
227 
228     // update metrics size
229     try {
230       // by-table or ensemble mode
231       int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
232       int functionsCount = getCostFunctionNames().length;
233 
234       updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
235     } catch (Exception e) {
236       LOG.error("failed to get the size of all tables, exception = " + e.getMessage());
237     }
238   }
239
240   /**
241    * Update the number of metrics that are reported to JMX
242    */
243   public void updateMetricsSize(int size) {
244     if (metricsBalancer instanceof MetricsStochasticBalancer) {
245         ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
246     }
247   }
248 
249   @Override
250   public synchronized void setMasterServices(MasterServices masterServices) {
251     super.setMasterServices(masterServices);
252     this.localityCost.setServices(masterServices);
253     this.localityCandidateGenerator.setServices(masterServices);
254
255   }
256
257   @Override
258   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
259     regionReplicaHostCostFunction.init(c);
260     if (regionReplicaHostCostFunction.cost() > 0) return true;
261     regionReplicaRackCostFunction.init(c);
262     if (regionReplicaRackCostFunction.cost() > 0) return true;
263     return false;
264   }
265
266   @Override
267   protected boolean needsBalance(Cluster cluster) {
268     ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
269     if (cs.getNumServers() < MIN_SERVER_BALANCE) {
270       if (LOG.isDebugEnabled()) {
271         LOG.debug("Not running balancer because only " + cs.getNumServers()
272             + " active regionserver(s)");
273       }
274       return false;
275     }
276     if (areSomeRegionReplicasColocated(cluster)) {
277       return true;
278     }
279
280     double total = 0.0;
281     float sumMultiplier = 0.0f;
282     for (CostFunction c : costFunctions) {
283       float multiplier = c.getMultiplier();
284       if (multiplier <= 0) {
285         continue;
286       }
287       sumMultiplier += multiplier;
288       total += c.cost() * multiplier;
289     }
290
291     if (total <= 0 || sumMultiplier <= 0
292         || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) {
293       LOG.info("Skipping load balancing because balanced cluster; " + "total cost is " + total
294           + ", sum multiplier is " + sumMultiplier + " min cost which need balance is "
295           + minCostNeedBalance);
296       return false;
297     }
298     return true;
299   }
300 
301   @Override
302   public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
303     List<HRegionInfo>> clusterState) {
304     this.tableName = tableName;
305     return balanceCluster(clusterState);
306   }
307
308   /**
309    * Given the cluster state this will try and approach an optimal balance. This
310    * should always approach the optimal state given enough steps.
311    */
312   @Override
313   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
314     List<HRegionInfo>> clusterState) {
315     List<RegionPlan> plans = balanceMasterRegions(clusterState);
316     if (plans != null || clusterState == null || clusterState.size() <= 1) {
317       return plans;
318     }
319
320     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
321       if (clusterState.size() <= 2) {
322         return null;
323       }
324       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
325       clusterState.remove(masterServerName);
326     }
327 
328     // On clusters with lots of HFileLinks or lots of reference files,
329     // instantiating the storefile infos can be quite expensive.
330     // Allow turning this feature off if the locality cost is not going to
331     // be used in any computations.
332     RegionLocationFinder finder = null;
333     if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
334       finder = this.regionFinder;
335     }
336 
337     //The clusterState that is given to this method contains the state
338     //of all the regions in the table(s) (that's true today)
339     // Keep track of servers to iterate through them.
340     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
341
342     long startTime = EnvironmentEdgeManager.currentTime();
343
344     initCosts(cluster);
345
346     if (!needsBalance(cluster)) {
347       return null;
348     }
349
350     double currentCost = computeCost(cluster, Double.MAX_VALUE);
351     curOverallCost = currentCost;
352     for (int i = 0; i < this.curFunctionCosts.length; i++) {
353       curFunctionCosts[i] = tempFunctionCosts[i];
354     }
355     LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
356         + functionCost());
357
358     double initCost = currentCost;
359     double newCost = currentCost;
360 
361     long computedMaxSteps = Math.min(this.maxSteps,
362         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
363     // Perform a stochastic walk to see if we can get a good fit.
364     long step;
365
366     for (step = 0; step < computedMaxSteps; step++) {
367       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
368       CandidateGenerator p = candidateGenerators[generatorIdx];
369       Cluster.Action action = p.generate(cluster);
370
371       if (action.type == Type.NULL) {
372         continue;
373       }
374 
375       cluster.doAction(action);
376       updateCostsWithAction(cluster, action);
377
378       newCost = computeCost(cluster, currentCost);
379
380       // Should this be kept?
381       if (newCost < currentCost) {
382         currentCost = newCost;
383
384         // save for JMX
385         curOverallCost = currentCost;
386         for (int i = 0; i < this.curFunctionCosts.length; i++) {
387           curFunctionCosts[i] = tempFunctionCosts[i];
388         }
389       } else {
390         // Put things back the way they were before.
391         // TODO: undo by remembering old values
392         Action undoAction = action.undoAction();
393         cluster.doAction(undoAction);
394         updateCostsWithAction(cluster, undoAction);
395       }
396
397       if (EnvironmentEdgeManager.currentTime() - startTime >
398           maxRunningTime) {
399         break;
400       }
401     }
402     long endTime = EnvironmentEdgeManager.currentTime();
403
404     metricsBalancer.balanceCluster(endTime - startTime);
405
406     // update costs metrics
407     updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
408     if (initCost > currentCost) {
409       plans = createRegionPlans(cluster);
410       if (LOG.isDebugEnabled()) {
411         LOG.debug("Finished computing new load balance plan.  Computation took "
412             + (endTime - startTime) + "ms to try " + step
413             + " different iterations.  Found a solution that moves "
414             + plans.size() + " regions; Going from a computed cost of "
415             + initCost + " to a new cost of " + currentCost);
416       }
417
418       return plans;
419     }
420     if (LOG.isDebugEnabled()) {
421       LOG.debug("Could not find a better load balance plan.  Tried "
422           + step + " different configurations in " + (endTime - startTime)
423           + "ms, and did not find anything with a computed cost less than " + initCost);
424     }
425     return null;
426   }
427
428   /**
429    * update costs to JMX
430    */
431   private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
432     if (tableName == null) return;
433
434     // check if the metricsBalancer is MetricsStochasticBalancer before casting
435     if (metricsBalancer instanceof MetricsStochasticBalancer) {
436       MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
437       // overall cost
438       balancer.updateStochasticCost(tableName.getNameAsString(),
439         "Overall", "Overall cost", overall);
440 
441       // each cost function
442       for (int i = 0; i < costFunctions.length; i++) {
443         CostFunction costFunction = costFunctions[i];
444         String costFunctionName = costFunction.getClass().getSimpleName();
445         Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
446         // TODO: cost function may need a specific description
447         balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
448           "The percent of " + costFunctionName, costPercent);
449       }
450     }
451   }
452
453   private String functionCost() {
454     StringBuilder builder = new StringBuilder();
455     for (CostFunction c:costFunctions) {
456       builder.append(c.getClass().getSimpleName());
457       builder.append(" : (");
458       builder.append(c.getMultiplier());
459       builder.append(", ");
460       builder.append(c.cost());
461       builder.append("); ");
462     }
463     return builder.toString();
464   }
465 
466   /**
467    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
468    * state.
469    *
470    * @param cluster The state of the cluster
471    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
472    */
473   private List<RegionPlan> createRegionPlans(Cluster cluster) {
474     List<RegionPlan> plans = new LinkedList<RegionPlan>();
475     for (int regionIndex = 0;
476          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
477       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
478       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
479 
480       if (initialServerIndex != newServerIndex) {
481         HRegionInfo region = cluster.regions[regionIndex];
482         ServerName initialServer = cluster.servers[initialServerIndex];
483         ServerName newServer = cluster.servers[newServerIndex];
484
485         if (LOG.isTraceEnabled()) {
486           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
487               + initialServer.getHostname() + " to " + newServer.getHostname());
488         }
489         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
490         plans.add(rp);
491       }
492     }
493     return plans;
494   }
495
496   /**
497    * Store the current region loads.
498    */
499   private synchronized void updateRegionLoad() {
500     // We create a new hashmap so that regions that are no longer there are removed.
501     // However we temporarily need the old loads so we can use them to keep the rolling average.
502     Map<String, Deque<RegionLoad>> oldLoads = loads;
503     loads = new HashMap<String, Deque<RegionLoad>>();
504
505     for (ServerName sn : clusterStatus.getServers()) {
506       ServerLoad sl = clusterStatus.getLoad(sn);
507       if (sl == null) {
508         continue;
509       }
510       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
511         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
512         if (rLoads == null) {
513           // There was nothing there
514           rLoads = new ArrayDeque<RegionLoad>();
515         } else if (rLoads.size() >= numRegionLoadsToRemember) {
516           rLoads.remove();
517         }
518         rLoads.add(entry.getValue());
519         loads.put(Bytes.toString(entry.getKey()), rLoads);
520
521       }
522     }
523
524     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
525       cost.setLoads(loads);
526     }
527   }
528
529   protected void initCosts(Cluster cluster) {
530     for (CostFunction c:costFunctions) {
531       c.init(cluster);
532     }
533   }
534
535   protected void updateCostsWithAction(Cluster cluster, Action action) {
536     for (CostFunction c : costFunctions) {
537       c.postAction(action);
538     }
539   }
540
541   /**
542    * Get the names of the cost functions
543    */
544   public String[] getCostFunctionNames() {
545     if (costFunctions == null) return null;
546     String[] ret = new String[costFunctions.length];
547     for (int i = 0; i < costFunctions.length; i++) {
548       CostFunction c = costFunctions[i];
549       ret[i] = c.getClass().getSimpleName();
550     }
551
552     return ret;
553   }
554
555   /**
556    * This is the main cost function.  It will compute a cost associated with a proposed cluster
557    * state.  All different costs will be combined with their multipliers to produce a double cost.
558    *
559    * @param cluster The state of the cluster
560    * @param previousCost the previous cost. This is used as an early out.
561    * @return a double of a cost associated with the proposed cluster state.  This cost is an
562    *         aggregate of all individual cost functions.
563    */
564   protected double computeCost(Cluster cluster, double previousCost) {
565     double total = 0;
566
567     for (int i = 0; i < costFunctions.length; i++) {
568       CostFunction c = costFunctions[i];
569       this.tempFunctionCosts[i] = 0.0;
570
571       if (c.getMultiplier() <= 0) {
572         continue;
573       }
574
575       Float multiplier = c.getMultiplier();
576       Double cost = c.cost();
577
578       this.tempFunctionCosts[i] = multiplier*cost;
579       total += this.tempFunctionCosts[i];
580
581       if (total > previousCost) {
582         break;
583       }
584     }
585
586     return total;
587   }
588
589   /** Generates a candidate action to be applied to the cluster for cost function search */
590   abstract static class CandidateGenerator {
591     abstract Cluster.Action generate(Cluster cluster);
592
593     /**
594      * From a list of regions pick a random one. Null can be returned which
595      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
596      * rather than swap.
597      *
598      * @param cluster        The state of the cluster
599      * @param server         index of the server
600      * @param chanceOfNoSwap Chance that this will decide to try a move rather
601      *                       than a swap.
602      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
603      *         suggested.
604      */
605     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
606       // Check to see if this is just a move.
607       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
608         // signal a move only.
609         return -1;
610       }
611       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
612       return cluster.regionsPerServer[server][rand];
613
614     }
615     protected int pickRandomServer(Cluster cluster) {
616       if (cluster.numServers < 1) {
617         return -1;
618       }
619
620       return RANDOM.nextInt(cluster.numServers);
621     }
622
623     protected int pickRandomRack(Cluster cluster) {
624       if (cluster.numRacks < 1) {
625         return -1;
626       }
627
628       return RANDOM.nextInt(cluster.numRacks);
629     }
630
631     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
632       if (cluster.numServers < 2) {
633         return -1;
634       }
635       while (true) {
636         int otherServerIndex = pickRandomServer(cluster);
637         if (otherServerIndex != serverIndex) {
638           return otherServerIndex;
639         }
640       }
641     }
642
643     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
644       if (cluster.numRacks < 2) {
645         return -1;
646       }
647       while (true) {
648         int otherRackIndex = pickRandomRack(cluster);
649         if (otherRackIndex != rackIndex) {
650           return otherRackIndex;
651         }
652       }
653     }
654 
655     protected Cluster.Action pickRandomRegions(Cluster cluster,
656                                                        int thisServer,
657                                                        int otherServer) {
658       if (thisServer < 0 || otherServer < 0) {
659         return Cluster.NullAction;
660       }
661
662       // Decide who is most likely to need another region
663       int thisRegionCount = cluster.getNumRegions(thisServer);
664       int otherRegionCount = cluster.getNumRegions(otherServer);
665
666       // Assign the chance based upon the above
667       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
668       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
669
670       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
671       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
672
673       return getAction(thisServer, thisRegion, otherServer, otherRegion);
674     }
675
676     protected Cluster.Action getAction(int fromServer, int fromRegion,
677         int toServer, int toRegion) {
678       if (fromServer < 0 || toServer < 0) {
679         return Cluster.NullAction;
680       }
681       if (fromRegion > 0 && toRegion > 0) {
682         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
683           toServer, toRegion);
684       } else if (fromRegion > 0) {
685         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
686       } else if (toRegion > 0) {
687         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
688       } else {
689         return Cluster.NullAction;
690       }
691     }
692   }
693
694   static class RandomCandidateGenerator extends CandidateGenerator {
695
696     @Override
697     Cluster.Action generate(Cluster cluster) {
698 
699       int thisServer = pickRandomServer(cluster);
700
701       // Pick the other server
702       int otherServer = pickOtherRandomServer(cluster, thisServer);
703
704       return pickRandomRegions(cluster, thisServer, otherServer);
705     }
706   }
707 
708   static class LoadCandidateGenerator extends CandidateGenerator {
709
710     @Override
711     Cluster.Action generate(Cluster cluster) {
712       cluster.sortServersByRegionCount();
713       int thisServer = pickMostLoadedServer(cluster, -1);
714       int otherServer = pickLeastLoadedServer(cluster, thisServer);
715
716       return pickRandomRegions(cluster, thisServer, otherServer);
717     }
718
719     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
720       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
721 
722       int index = 0;
723       while (servers[index] == null || servers[index] == thisServer) {
724         index++;
725         if (index == servers.length) {
726           return -1;
727         }
728       }
729       return servers[index];
730     }
731 
732     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
733       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
734 
735       int index = servers.length - 1;
736       while (servers[index] == null || servers[index] == thisServer) {
737         index--;
738         if (index < 0) {
739           return -1;
740         }
741       }
742       return servers[index];
743     }
744   }
745
746   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
747 
748     private MasterServices masterServices;
749
750     LocalityBasedCandidateGenerator(MasterServices masterServices) {
751       this.masterServices = masterServices;
752     }
753 
754     @Override
755     Cluster.Action generate(Cluster cluster) {
756       if (this.masterServices == null) {
757         int thisServer = pickRandomServer(cluster);
758         // Pick the other server
759         int otherServer = pickOtherRandomServer(cluster, thisServer);
760         return pickRandomRegions(cluster, thisServer, otherServer);
761       }
762
763       int thisServer = pickRandomServer(cluster);
764       int thisRegion;
765       if (thisServer == -1) {
766         LOG.warn("Could not pick lowest locality region server");
767         return Cluster.NullAction;
768       } else {
769       // Pick lowest locality region on this server
770         thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
771       }
772
773       if (thisRegion == -1) {
774         return Cluster.NullAction;
775       }
776
777       // Pick the least loaded server with good locality for the region
778       int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion, thisServer);
779
780       if (otherServer == -1) {
781         return Cluster.NullAction;
782       }
783
784       // Let the candidate region be moved to its highest locality server.
785       int otherRegion = -1;
786
787       return getAction(thisServer, thisRegion, otherServer, otherRegion);
788     }
789
790     private int pickLowestLocalityServer(Cluster cluster) {
791       return cluster.getLowestLocalityRegionServer();
792     }
793 
794     private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
795       return cluster.getLowestLocalityRegionOnServer(server);
796     }
797
798     void setServices(MasterServices services) {
799       this.masterServices = services;
800     }
801   }
802
803   /**
804    * Generates candidates which moves the replicas out of the region server for
805    * co-hosted region replicas
806    */
807   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
808
809     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
810
811     /**
812      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
813      * (a group is a server, host or rack)
814      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
815      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
816      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
817      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
818      * @return a regionIndex for the selected primary or -1 if there is no co-locating
819      */
820     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
821         , int[] regionIndexToPrimaryIndex) {
822       int currentPrimary = -1;
823       int currentPrimaryIndex = -1;
824       int selectedPrimaryIndex = -1;
825       double currentLargestRandom = -1;
826       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
827       // ids for the regions hosted in server, a consecutive repetition means that replicas
828       // are co-hosted
829       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
830         int primary = j < primariesOfRegionsPerGroup.length
831             ? primariesOfRegionsPerGroup[j] : -1;
832         if (primary != currentPrimary) { // check for whether we see a new primary
833           int numReplicas = j - currentPrimaryIndex;
834           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
835             // decide to select this primary region id or not
836             double currentRandom = RANDOM.nextDouble();
837             // we don't know how many region replicas are co-hosted, we will randomly select one
838             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
839             if (currentRandom > currentLargestRandom) {
840               selectedPrimaryIndex = currentPrimary;
841               currentLargestRandom = currentRandom;
842             }
843           }
844           currentPrimary = primary;
845           currentPrimaryIndex = j;
846         }
847       }
848 
849       // we have found the primary id for the region to move. Now find the actual regionIndex
850       // with the given primary, prefer to move the secondary region.
851       for (int j = 0; j < regionsPerGroup.length; j++) {
852         int regionIndex = regionsPerGroup[j];
853         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
854           // always move the secondary, not the primary
855           if (selectedPrimaryIndex != regionIndex) {
856             return regionIndex;
857           }
858         }
859       }
860       return -1;
861     }
862
863     @Override
864     Cluster.Action generate(Cluster cluster) {
865       int serverIndex = pickRandomServer(cluster);
866       if (cluster.numServers <= 1 || serverIndex == -1) {
867         return Cluster.NullAction;
868       }
869 
870       int regionIndex = selectCoHostedRegionPerGroup(
871         cluster.primariesOfRegionsPerServer[serverIndex],
872         cluster.regionsPerServer[serverIndex],
873         cluster.regionIndexToPrimaryIndex);
874
875       // if there are no pairs of region replicas co-hosted, default to random generator
876       if (regionIndex == -1) {
877         // default to randompicker
878         return randomGenerator.generate(cluster);
879       }
880
881       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
882       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
883       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
884     }
885   }
886
887   /**
888    * Generates candidates which moves the replicas out of the rack for
889    * co-hosted region replicas in the same rack
890    */
891   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
892     @Override
893     Cluster.Action generate(Cluster cluster) {
894       int rackIndex = pickRandomRack(cluster);
895       if (cluster.numRacks <= 1 || rackIndex == -1) {
896         return super.generate(cluster);
897       }
898
899       int regionIndex = selectCoHostedRegionPerGroup(
900         cluster.primariesOfRegionsPerRack[rackIndex],
901         cluster.regionsPerRack[rackIndex],
902         cluster.regionIndexToPrimaryIndex);
903
904       // if there are no pairs of region replicas co-hosted, default to random generator
905       if (regionIndex == -1) {
906         // default to randompicker
907         return randomGenerator.generate(cluster);
908       }
909
910       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
911       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
912
913       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
914       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
915       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
916       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
917     }
918   }
919
920   /**
921    * Base class of StochasticLoadBalancer's Cost Functions.
922    */
923   abstract static class CostFunction {
924
925     private float multiplier = 0;
926
927     protected Cluster cluster;
928
929     CostFunction(Configuration c) {
930
931     }
932
933     float getMultiplier() {
934       return multiplier;
935     }
936
937     void setMultiplier(float m) {
938       this.multiplier = m;
939     }
940
941     /** Called once per LB invocation to give the cost function
942      * to initialize it's state, and perform any costly calculation.
943      */
944     void init(Cluster cluster) {
945       this.cluster = cluster;
946     }
947
948     /** Called once per cluster Action to give the cost function
949      * an opportunity to update it's state. postAction() is always
950      * called at least once before cost() is called with the cluster
951      * that this action is performed on. */
952     void postAction(Action action) {
953       switch (action.type) {
954       case NULL: break;
955       case ASSIGN_REGION:
956         AssignRegionAction ar = (AssignRegionAction) action;
957         regionMoved(ar.region, -1, ar.server);
958         break;
959       case MOVE_REGION:
960         MoveRegionAction mra = (MoveRegionAction) action;
961         regionMoved(mra.region, mra.fromServer, mra.toServer);
962         break;
963       case SWAP_REGIONS:
964         SwapRegionsAction a = (SwapRegionsAction) action;
965         regionMoved(a.fromRegion, a.fromServer, a.toServer);
966         regionMoved(a.toRegion, a.toServer, a.fromServer);
967         break;
968       default:
969         throw new RuntimeException("Uknown action:" + action.type);
970       }
971     }
972 
973     protected void regionMoved(int region, int oldServer, int newServer) {
974     }
975
976     abstract double cost();
977
978     /**
979      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
980      * assumes that this is a zero sum set of costs.  It assumes that the worst case
981      * possible is all of the elements in one region server and the rest having 0.
982      *
983      * @param stats the costs
984      * @return a scaled set of costs.
985      */
986     protected double costFromArray(double[] stats) {
987       double totalCost = 0;
988       double total = getSum(stats);
989
990       double count = stats.length;
991       double mean = total/count;
992
993       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
994       // a zero sum cost for this to make sense.
995       double max = ((count - 1) * mean) + (total - mean);
996
997       // It's possible that there aren't enough regions to go around
998       double min;
999       if (count > total) {
1000         min = ((count - total) * mean) + ((1 - mean) * total);
1001       } else {
1002         // Some will have 1 more than everything else.
1003         int numHigh = (int) (total - (Math.floor(mean) * count));
1004         int numLow = (int) (count - numHigh);
1005
1006         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
1007 
1008       }
1009       min = Math.max(0, min);
1010       for (int i=0; i<stats.length; i++) {
1011         double n = stats[i];
1012         double diff = Math.abs(mean - n);
1013         totalCost += diff;
1014       }
1015
1016       double scaled =  scale(min, max, totalCost);
1017       return scaled;
1018     }
1019
1020     private double getSum(double[] stats) {
1021       double total = 0;
1022       for(double s:stats) {
1023         total += s;
1024       }
1025       return total;
1026     }
1027
1028     /**
1029      * Scale the value between 0 and 1.
1030      *
1031      * @param min   Min value
1032      * @param max   The Max value
1033      * @param value The value to be scaled.
1034      * @return The scaled value.
1035      */
1036     protected double scale(double min, double max, double value) {
1037       if (max <= min || value <= min) {
1038         return 0;
1039       }
1040       if ((max - min) == 0) return 0;
1041 
1042       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1043     }
1044   }
1045
1046   /**
1047    * Given the starting state of the regions and a potential ending state
1048    * compute cost based upon the number of regions that have moved.
1049    */
1050   static class MoveCostFunction extends CostFunction {
1051     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1052     private static final String MAX_MOVES_PERCENT_KEY =
1053         "hbase.master.balancer.stochastic.maxMovePercent";
1054     private static final float DEFAULT_MOVE_COST = 7;
1055     private static final int DEFAULT_MAX_MOVES = 600;
1056     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1057
1058     private final float maxMovesPercent;
1059 
1060     MoveCostFunction(Configuration conf) {
1061       super(conf);
1062
1063       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1064       // that large benefits are need to overcome the cost of a move.
1065       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1066       // What percent of the number of regions a single run of the balancer can move.
1067       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1068     }
1069
1070     @Override
1071     double cost() {
1072       // Try and size the max number of Moves, but always be prepared to move some.
1073       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1074           DEFAULT_MAX_MOVES);
1075
1076       double moveCost = cluster.numMovedRegions;
1077
1078       // Don't let this single balance move more than the max moves.
1079       // This allows better scaling to accurately represent the actual cost of a move.
1080       if (moveCost > maxMoves) {
1081         return 1000000;   // return a number much greater than any of the other cost
1082       }
1083
1084       return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
1085     }
1086   }
1087
1088   /**
1089    * Compute the cost of a potential cluster state from skew in number of
1090    * regions on a cluster.
1091    */
1092   static class RegionCountSkewCostFunction extends CostFunction {
1093     private static final String REGION_COUNT_SKEW_COST_KEY =
1094         "hbase.master.balancer.stochastic.regionCountCost";
1095     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1096
1097     private double[] stats = null;
1098
1099     RegionCountSkewCostFunction(Configuration conf) {
1100       super(conf);
1101       // Load multiplier should be the greatest as it is the most general way to balance data.
1102       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1103     }
1104
1105     @Override
1106     double cost() {
1107       if (stats == null || stats.length != cluster.numServers) {
1108         stats = new double[cluster.numServers];
1109       }
1110
1111       for (int i =0; i < cluster.numServers; i++) {
1112         stats[i] = cluster.regionsPerServer[i].length;
1113       }
1114
1115       return costFromArray(stats);
1116     }
1117   }
1118 
1119   /**
1120    * Compute the cost of a potential cluster state from skew in number of
1121    * primary regions on a cluster.
1122    */
1123   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1124     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1125         "hbase.master.balancer.stochastic.primaryRegionCountCost";
1126     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1127
1128     private double[] stats = null;
1129
1130     PrimaryRegionCountSkewCostFunction(Configuration conf) {
1131       super(conf);
1132       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1133       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1134         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1135     }
1136
1137     @Override
1138     double cost() {
1139       if (!cluster.hasRegionReplicas) {
1140         return 0;
1141       }
1142       if (stats == null || stats.length != cluster.numServers) {
1143         stats = new double[cluster.numServers];
1144       }
1145
1146       for (int i =0; i < cluster.numServers; i++) {
1147         stats[i] = 0;
1148         for (int regionIdx : cluster.regionsPerServer[i]) {
1149           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1150             stats[i] ++;
1151           }
1152         }
1153       }
1154
1155       return costFromArray(stats);
1156     }
1157   }
1158 
1159   /**
1160    * Compute the cost of a potential cluster configuration based upon how evenly
1161    * distributed tables are.
1162    */
1163   static class TableSkewCostFunction extends CostFunction {
1164
1165     private static final String TABLE_SKEW_COST_KEY =
1166         "hbase.master.balancer.stochastic.tableSkewCost";
1167     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1168 
1169     TableSkewCostFunction(Configuration conf) {
1170       super(conf);
1171       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1172     }
1173
1174     @Override
1175     double cost() {
1176       double max = cluster.numRegions;
1177       double min = ((double) cluster.numRegions) / cluster.numServers;
1178       double value = 0;
1179
1180       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1181         value += cluster.numMaxRegionsPerTable[i];
1182       }
1183
1184       return scale(min, max, value);
1185     }
1186   }
1187
1188   /**
1189    * Compute a cost of a potential cluster configuration based upon where
1190    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1191    */
1192   static class LocalityCostFunction extends CostFunction {
1193
1194     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1195     private static final float DEFAULT_LOCALITY_COST = 25;
1196
1197     private MasterServices services;
1198 
1199     LocalityCostFunction(Configuration conf, MasterServices srv) {
1200       super(conf);
1201       this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1202       this.services = srv;
1203     }
1204
1205     void setServices(MasterServices srvc) {
1206       this.services = srvc;
1207     }
1208
1209     @Override
1210     double cost() {
1211       double max = 0;
1212       double cost = 0;
1213 
1214       // If there's no master so there's no way anything else works.
1215       if (this.services == null) {
1216         return cost;
1217       }
1218
1219       for (int i = 0; i < cluster.regionLocations.length; i++) {
1220         max += 1;
1221         int serverIndex = cluster.regionIndexToServerIndex[i];
1222         int[] regionLocations = cluster.regionLocations[i];
1223 
1224         // If we can't find where the data is getTopBlock returns null.
1225         // so count that as being the best possible.
1226         if (regionLocations == null) {
1227           continue;
1228         }
1229
1230         int index = -1;
1231         for (int j = 0; j < regionLocations.length; j++) {
1232           if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1233             index = j;
1234             break;
1235           }
1236         }
1237 
1238         if (index < 0) {
1239           cost += 1;
1240         } else {
1241           cost += (1 - cluster.getLocalityOfRegion(i, index));
1242         }
1243       }
1244       return scale(0, max, cost);
1245     }
1246   }
1247
1248   /**
1249    * Base class the allows writing costs functions from rolling average of some
1250    * number from RegionLoad.
1251    */
1252   abstract static class CostFromRegionLoadFunction extends CostFunction {
1253
1254     private ClusterStatus clusterStatus = null;
1255     private Map<String, Deque<RegionLoad>> loads = null;
1256     private double[] stats = null;
1257     CostFromRegionLoadFunction(Configuration conf) {
1258       super(conf);
1259     }
1260
1261     void setClusterStatus(ClusterStatus status) {
1262       this.clusterStatus = status;
1263     }
1264 
1265     void setLoads(Map<String, Deque<RegionLoad>> l) {
1266       this.loads = l;
1267     }
1268
1269     @Override
1270     double cost() {
1271       if (clusterStatus == null || loads == null) {
1272         return 0;
1273       }
1274
1275       if (stats == null || stats.length != cluster.numServers) {
1276         stats = new double[cluster.numServers];
1277       }
1278
1279       for (int i =0; i < stats.length; i++) {
1280         //Cost this server has from RegionLoad
1281         long cost = 0;
1282
1283         // for every region on this server get the rl
1284         for(int regionIndex:cluster.regionsPerServer[i]) {
1285           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1286
1287           // Now if we found a region load get the type of cost that was requested.
1288           if (regionLoadList != null) {
1289             cost += getRegionLoadCost(regionLoadList);
1290           }
1291         }
1292
1293         // Add the total cost to the stats.
1294         stats[i] = cost;
1295       }
1296
1297       // Now return the scaled cost from data held in the stats object.
1298       return costFromArray(stats);
1299     }
1300
1301     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1302       double cost = 0;
1303
1304       for (RegionLoad rl : regionLoadList) {
1305         double toAdd = getCostFromRl(rl);
1306
1307         if (cost == 0) {
1308           cost = toAdd;
1309         } else {
1310           cost = (.5 * cost) + (.5 * toAdd);
1311         }
1312       }
1313
1314       return cost;
1315     }
1316
1317     protected abstract double getCostFromRl(RegionLoad rl);
1318   }
1319 
1320   /**
1321    * Compute the cost of total number of read requests  The more unbalanced the higher the
1322    * computed cost will be.  This uses a rolling average of regionload.
1323    */
1324
1325   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1326
1327     private static final String READ_REQUEST_COST_KEY =
1328         "hbase.master.balancer.stochastic.readRequestCost";
1329     private static final float DEFAULT_READ_REQUEST_COST = 5;
1330
1331     ReadRequestCostFunction(Configuration conf) {
1332       super(conf);
1333       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1334     }
1335
1336
1337     @Override
1338     protected double getCostFromRl(RegionLoad rl) {
1339       return rl.getReadRequestsCount();
1340     }
1341   }
1342
1343   /**
1344    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1345    * computed cost will be.  This uses a rolling average of regionload.
1346    */
1347   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1348
1349     private static final String WRITE_REQUEST_COST_KEY =
1350         "hbase.master.balancer.stochastic.writeRequestCost";
1351     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1352 
1353     WriteRequestCostFunction(Configuration conf) {
1354       super(conf);
1355       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1356     }
1357
1358     @Override
1359     protected double getCostFromRl(RegionLoad rl) {
1360       return rl.getWriteRequestsCount();
1361     }
1362   }
1363
1364   /**
1365    * A cost function for region replicas. We give a very high cost to hosting
1366    * replicas of the same region in the same host. We do not prevent the case
1367    * though, since if numReplicas > numRegionServers, we still want to keep the
1368    * replica open.
1369    */
1370   static class RegionReplicaHostCostFunction extends CostFunction {
1371     private static final String REGION_REPLICA_HOST_COST_KEY =
1372         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1373     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1374
1375     long maxCost = 0;
1376     long[] costsPerGroup; // group is either server, host or rack
1377     int[][] primariesOfRegionsPerGroup;
1378
1379     public RegionReplicaHostCostFunction(Configuration conf) {
1380       super(conf);
1381       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1382         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1383     }
1384
1385     @Override
1386     void init(Cluster cluster) {
1387       super.init(cluster);
1388       // max cost is the case where every region replica is hosted together regardless of host
1389       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1390       costsPerGroup = new long[cluster.numHosts];
1391       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1392           ? cluster.primariesOfRegionsPerHost
1393           : cluster.primariesOfRegionsPerServer;
1394       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1395         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1396       }
1397     }
1398 
1399     long getMaxCost(Cluster cluster) {
1400       if (!cluster.hasRegionReplicas) {
1401         return 0; // short circuit
1402       }
1403       // max cost is the case where every region replica is hosted together regardless of host
1404       int[] primariesOfRegions = new int[cluster.numRegions];
1405       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1406           cluster.regions.length);
1407
1408       Arrays.sort(primariesOfRegions);
1409
1410       // compute numReplicas from the sorted array
1411       return costPerGroup(primariesOfRegions);
1412     }
1413
1414     @Override
1415     double cost() {
1416       if (maxCost <= 0) {
1417         return 0;
1418       }
1419
1420       long totalCost = 0;
1421       for (int i = 0 ; i < costsPerGroup.length; i++) {
1422         totalCost += costsPerGroup[i];
1423       }
1424       return scale(0, maxCost, totalCost);
1425     }
1426
1427     /**
1428      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1429      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1430      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1431      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1432      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1433      * @return a sum of numReplicas-1 squared for each primary region in the group.
1434      */
1435     protected long costPerGroup(int[] primariesOfRegions) {
1436       long cost = 0;
1437       int currentPrimary = -1;
1438       int currentPrimaryIndex = -1;
1439       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1440       // sharing the same primary will have consecutive numbers in the array.
1441       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1442         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1443         if (primary != currentPrimary) { // we see a new primary
1444           int numReplicas = j - currentPrimaryIndex;
1445           // square the cost
1446           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1447             cost += (numReplicas - 1) * (numReplicas - 1);
1448           }
1449           currentPrimary = primary;
1450           currentPrimaryIndex = j;
1451         }
1452       }
1453
1454       return cost;
1455     }
1456
1457     @Override
1458     protected void regionMoved(int region, int oldServer, int newServer) {
1459       if (maxCost <= 0) {
1460         return; // no need to compute
1461       }
1462       if (cluster.multiServersPerHost) {
1463         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1464         int newHost = cluster.serverIndexToHostIndex[newServer];
1465         if (newHost != oldHost) {
1466           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1467           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1468         }
1469       } else {
1470         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1471         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1472       }
1473     }
1474   }
1475 
1476   /**
1477    * A cost function for region replicas for the rack distribution. We give a relatively high
1478    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1479    * though.
1480    */
1481   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1482     private static final String REGION_REPLICA_RACK_COST_KEY =
1483         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1484     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1485
1486     public RegionReplicaRackCostFunction(Configuration conf) {
1487       super(conf);
1488       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1489         DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1490     }
1491 
1492     @Override
1493     void init(Cluster cluster) {
1494       this.cluster = cluster;
1495       if (cluster.numRacks <= 1) {
1496         maxCost = 0;
1497         return; // disabled for 1 rack
1498       }
1499       // max cost is the case where every region replica is hosted together regardless of rack
1500       maxCost = getMaxCost(cluster);
1501       costsPerGroup = new long[cluster.numRacks];
1502       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1503         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1504       }
1505     }
1506 
1507     @Override
1508     protected void regionMoved(int region, int oldServer, int newServer) {
1509       if (maxCost <= 0) {
1510         return; // no need to compute
1511       }
1512       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1513       int newRack = cluster.serverIndexToRackIndex[newServer];
1514       if (newRack != oldRack) {
1515         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1516         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1517       }
1518     }
1519   }
1520
1521   /**
1522    * Compute the cost of total memstore size.  The more unbalanced the higher the
1523    * computed cost will be.  This uses a rolling average of regionload.
1524    */
1525   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1526
1527     private static final String MEMSTORE_SIZE_COST_KEY =
1528         "hbase.master.balancer.stochastic.memstoreSizeCost";
1529     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1530
1531     MemstoreSizeCostFunction(Configuration conf) {
1532       super(conf);
1533       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1534     }
1535
1536     @Override
1537     protected double getCostFromRl(RegionLoad rl) {
1538       return rl.getMemStoreSizeMB();
1539     }
1540   }
1541   /**
1542    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1543    * computed cost will be.  This uses a rolling average of regionload.
1544    */
1545   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1546
1547     private static final String STOREFILE_SIZE_COST_KEY =
1548         "hbase.master.balancer.stochastic.storefileSizeCost";
1549     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1550
1551     StoreFileCostFunction(Configuration conf) {
1552       super(conf);
1553       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1554     }
1555
1556     @Override
1557     protected double getCostFromRl(RegionLoad rl) {
1558       return rl.getStorefileSizeMB();
1559     }
1560   }
1561
1562   /**
1563    * A helper function to compose the attribute name from tablename and costfunction name
1564    */
1565   public static String composeAttributeName(String tableName, String costFunctionName) {
1566     return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1567   }
1568 }