1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayDeque;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Deque;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Random;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ClusterStatus;
36 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.RegionLoad;
39 import org.apache.hadoop.hbase.ServerLoad;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.master.MasterServices;
42 import org.apache.hadoop.hbase.master.RegionPlan;
43 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
44 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
45 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
46 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
47 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
95 public class StochasticLoadBalancer extends BaseLoadBalancer {
96
97 protected static final String STEPS_PER_REGION_KEY =
98 "hbase.master.balancer.stochastic.stepsPerRegion";
99 protected static final String MAX_STEPS_KEY =
100 "hbase.master.balancer.stochastic.maxSteps";
101 protected static final String MAX_RUNNING_TIME_KEY =
102 "hbase.master.balancer.stochastic.maxRunningTime";
103 protected static final String KEEP_REGION_LOADS =
104 "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
105
106 private static final Random RANDOM = new Random(System.currentTimeMillis());
107 private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
108
109 Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
110
111
112 private int maxSteps = 1000000;
113 private int stepsPerRegion = 800;
114 private long maxRunningTime = 30 * 1000 * 1;
115 private int numRegionLoadsToRemember = 15;
116
117 private CandidateGenerator[] candidateGenerators;
118 private CostFromRegionLoadFunction[] regionLoadFunctions;
119 private CostFunction[] costFunctions;
120
121
122 private LocalityBasedCandidateGenerator localityCandidateGenerator;
123 private LocalityCostFunction localityCost;
124 private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
125 private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
126
127 @Override
128 public void onConfigurationChange(Configuration conf) {
129 setConf(conf);
130 }
131
132 @Override
133 public synchronized void setConf(Configuration conf) {
134 super.setConf(conf);
135 LOG.info("loading config");
136
137 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
138
139 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
140 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
141
142 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
143
144 if (localityCandidateGenerator == null) {
145 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
146 }
147 localityCost = new LocalityCostFunction(conf, services);
148
149 if (candidateGenerators == null) {
150 candidateGenerators = new CandidateGenerator[] {
151 new RandomCandidateGenerator(),
152 new LoadCandidateGenerator(),
153 localityCandidateGenerator,
154 new RegionReplicaRackCandidateGenerator(),
155 };
156 }
157
158 regionLoadFunctions = new CostFromRegionLoadFunction[] {
159 new ReadRequestCostFunction(conf),
160 new WriteRequestCostFunction(conf),
161 new MemstoreSizeCostFunction(conf),
162 new StoreFileCostFunction(conf)
163 };
164
165 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
166 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
167
168 costFunctions = new CostFunction[]{
169 new RegionCountSkewCostFunction(conf),
170 new MoveCostFunction(conf),
171 localityCost,
172 new TableSkewCostFunction(conf),
173 regionReplicaHostCostFunction,
174 regionReplicaRackCostFunction,
175 regionLoadFunctions[0],
176 regionLoadFunctions[1],
177 regionLoadFunctions[2],
178 regionLoadFunctions[3],
179 };
180 }
181
182 @Override
183 protected void setSlop(Configuration conf) {
184 this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
185 }
186
187 @Override
188 public synchronized void setClusterStatus(ClusterStatus st) {
189 super.setClusterStatus(st);
190 updateRegionLoad();
191 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
192 cost.setClusterStatus(st);
193 }
194 }
195
196 @Override
197 public synchronized void setMasterServices(MasterServices masterServices) {
198 super.setMasterServices(masterServices);
199 this.localityCost.setServices(masterServices);
200 this.localityCandidateGenerator.setServices(masterServices);
201
202 }
203
204 @Override
205 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
206 regionReplicaHostCostFunction.init(c);
207 if (regionReplicaHostCostFunction.cost() > 0) return true;
208 regionReplicaRackCostFunction.init(c);
209 if (regionReplicaRackCostFunction.cost() > 0) return true;
210 return false;
211 }
212
213
214
215
216
217 @Override
218 public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
219 List<HRegionInfo>> clusterState) {
220 List<RegionPlan> plans = balanceMasterRegions(clusterState);
221 if (plans != null || clusterState == null || clusterState.size() <= 1) {
222 return plans;
223 }
224 if (masterServerName != null && clusterState.containsKey(masterServerName)) {
225 if (clusterState.size() <= 2) {
226 return null;
227 }
228 clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
229 clusterState.remove(masterServerName);
230 }
231
232
233
234
235
236 RegionLocationFinder finder = null;
237 if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
238 finder = this.regionFinder;
239 }
240
241
242
243
244 Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
245 if (!needsBalance(cluster)) {
246 return null;
247 }
248
249 long startTime = EnvironmentEdgeManager.currentTime();
250
251 initCosts(cluster);
252
253 double currentCost = computeCost(cluster, Double.MAX_VALUE);
254
255 double initCost = currentCost;
256 double newCost = currentCost;
257
258 long computedMaxSteps = Math.min(this.maxSteps,
259 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
260
261 long step;
262
263 for (step = 0; step < computedMaxSteps; step++) {
264 int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
265 CandidateGenerator p = candidateGenerators[generatorIdx];
266 Cluster.Action action = p.generate(cluster);
267
268 if (action.type == Type.NULL) {
269 continue;
270 }
271
272 cluster.doAction(action);
273 updateCostsWithAction(cluster, action);
274
275 newCost = computeCost(cluster, currentCost);
276
277
278 if (newCost < currentCost) {
279 currentCost = newCost;
280 } else {
281
282
283 Action undoAction = action.undoAction();
284 cluster.doAction(undoAction);
285 updateCostsWithAction(cluster, undoAction);
286 }
287
288 if (EnvironmentEdgeManager.currentTime() - startTime >
289 maxRunningTime) {
290 break;
291 }
292 }
293
294 long endTime = EnvironmentEdgeManager.currentTime();
295
296 metricsBalancer.balanceCluster(endTime - startTime);
297
298 if (initCost > currentCost) {
299 plans = createRegionPlans(cluster);
300 if (LOG.isDebugEnabled()) {
301 LOG.debug("Finished computing new load balance plan. Computation took "
302 + (endTime - startTime) + "ms to try " + step
303 + " different iterations. Found a solution that moves "
304 + plans.size() + " regions; Going from a computed cost of "
305 + initCost + " to a new cost of " + currentCost);
306 }
307 return plans;
308 }
309 if (LOG.isDebugEnabled()) {
310 LOG.debug("Could not find a better load balance plan. Tried "
311 + step + " different configurations in " + (endTime - startTime)
312 + "ms, and did not find anything with a computed cost less than " + initCost);
313 }
314 return null;
315 }
316
317
318
319
320
321
322
323
324 private List<RegionPlan> createRegionPlans(Cluster cluster) {
325 List<RegionPlan> plans = new LinkedList<RegionPlan>();
326 for (int regionIndex = 0;
327 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
328 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
329 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
330
331 if (initialServerIndex != newServerIndex) {
332 HRegionInfo region = cluster.regions[regionIndex];
333 ServerName initialServer = cluster.servers[initialServerIndex];
334 ServerName newServer = cluster.servers[newServerIndex];
335
336 if (LOG.isTraceEnabled()) {
337 LOG.trace("Moving Region " + region.getEncodedName() + " from server "
338 + initialServer.getHostname() + " to " + newServer.getHostname());
339 }
340 RegionPlan rp = new RegionPlan(region, initialServer, newServer);
341 plans.add(rp);
342 }
343 }
344 return plans;
345 }
346
347
348
349
350 private synchronized void updateRegionLoad() {
351
352
353 Map<String, Deque<RegionLoad>> oldLoads = loads;
354 loads = new HashMap<String, Deque<RegionLoad>>();
355
356 for (ServerName sn : clusterStatus.getServers()) {
357 ServerLoad sl = clusterStatus.getLoad(sn);
358 if (sl == null) {
359 continue;
360 }
361 for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
362 Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
363 if (rLoads == null) {
364
365 rLoads = new ArrayDeque<RegionLoad>();
366 } else if (rLoads.size() >= numRegionLoadsToRemember) {
367 rLoads.remove();
368 }
369 rLoads.add(entry.getValue());
370 loads.put(Bytes.toString(entry.getKey()), rLoads);
371
372 }
373 }
374
375 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
376 cost.setLoads(loads);
377 }
378 }
379
380 protected void initCosts(Cluster cluster) {
381 for (CostFunction c:costFunctions) {
382 c.init(cluster);
383 }
384 }
385
386 protected void updateCostsWithAction(Cluster cluster, Action action) {
387 for (CostFunction c : costFunctions) {
388 c.postAction(action);
389 }
390 }
391
392
393
394
395
396
397
398
399
400
401 protected double computeCost(Cluster cluster, double previousCost) {
402 double total = 0;
403
404 for (CostFunction c:costFunctions) {
405 if (c.getMultiplier() <= 0) {
406 continue;
407 }
408
409 total += c.getMultiplier() * c.cost();
410
411 if (total > previousCost) {
412 return total;
413 }
414 }
415 return total;
416 }
417
418
419 abstract static class CandidateGenerator {
420 abstract Cluster.Action generate(Cluster cluster);
421
422
423
424
425
426
427
428
429
430
431
432
433
434 protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
435
436 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
437
438 return -1;
439 }
440 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
441 return cluster.regionsPerServer[server][rand];
442
443 }
444 protected int pickRandomServer(Cluster cluster) {
445 if (cluster.numServers < 1) {
446 return -1;
447 }
448
449 return RANDOM.nextInt(cluster.numServers);
450 }
451
452 protected int pickRandomRack(Cluster cluster) {
453 if (cluster.numRacks < 1) {
454 return -1;
455 }
456
457 return RANDOM.nextInt(cluster.numRacks);
458 }
459
460 protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
461 if (cluster.numServers < 2) {
462 return -1;
463 }
464 while (true) {
465 int otherServerIndex = pickRandomServer(cluster);
466 if (otherServerIndex != serverIndex) {
467 return otherServerIndex;
468 }
469 }
470 }
471
472 protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
473 if (cluster.numRacks < 2) {
474 return -1;
475 }
476 while (true) {
477 int otherRackIndex = pickRandomRack(cluster);
478 if (otherRackIndex != rackIndex) {
479 return otherRackIndex;
480 }
481 }
482 }
483
484 protected Cluster.Action pickRandomRegions(Cluster cluster,
485 int thisServer,
486 int otherServer) {
487 if (thisServer < 0 || otherServer < 0) {
488 return Cluster.NullAction;
489 }
490
491
492 int thisRegionCount = cluster.getNumRegions(thisServer);
493 int otherRegionCount = cluster.getNumRegions(otherServer);
494
495
496 double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
497 double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
498
499 int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
500 int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
501
502 return getAction(thisServer, thisRegion, otherServer, otherRegion);
503 }
504
505 protected Cluster.Action getAction (int fromServer, int fromRegion,
506 int toServer, int toRegion) {
507 if (fromServer < 0 || toServer < 0) {
508 return Cluster.NullAction;
509 }
510 if (fromRegion > 0 && toRegion > 0) {
511 return new Cluster.SwapRegionsAction(fromServer, fromRegion,
512 toServer, toRegion);
513 } else if (fromRegion > 0) {
514 return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
515 } else if (toRegion > 0) {
516 return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
517 } else {
518 return Cluster.NullAction;
519 }
520 }
521 }
522
523 static class RandomCandidateGenerator extends CandidateGenerator {
524
525 @Override
526 Cluster.Action generate(Cluster cluster) {
527
528 int thisServer = pickRandomServer(cluster);
529
530
531 int otherServer = pickOtherRandomServer(cluster, thisServer);
532
533 return pickRandomRegions(cluster, thisServer, otherServer);
534 }
535 }
536
537 static class LoadCandidateGenerator extends CandidateGenerator {
538
539 @Override
540 Cluster.Action generate(Cluster cluster) {
541 cluster.sortServersByRegionCount();
542 int thisServer = pickMostLoadedServer(cluster, -1);
543 int otherServer = pickLeastLoadedServer(cluster, thisServer);
544
545 return pickRandomRegions(cluster, thisServer, otherServer);
546 }
547
548 private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
549 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
550
551 int index = 0;
552 while (servers[index] == null || servers[index] == thisServer) {
553 index++;
554 if (index == servers.length) {
555 return -1;
556 }
557 }
558 return servers[index];
559 }
560
561 private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
562 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
563
564 int index = servers.length - 1;
565 while (servers[index] == null || servers[index] == thisServer) {
566 index--;
567 if (index < 0) {
568 return -1;
569 }
570 }
571 return servers[index];
572 }
573 }
574
575 static class LocalityBasedCandidateGenerator extends CandidateGenerator {
576
577 private MasterServices masterServices;
578
579 LocalityBasedCandidateGenerator(MasterServices masterServices) {
580 this.masterServices = masterServices;
581 }
582
583 @Override
584 Cluster.Action generate(Cluster cluster) {
585 if (this.masterServices == null) {
586 return Cluster.NullAction;
587 }
588
589 int thisServer = pickRandomServer(cluster);
590
591
592 int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
593
594 if (thisRegion == -1) {
595 return Cluster.NullAction;
596 }
597
598
599 int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
600
601 if (otherServer == -1) {
602 return Cluster.NullAction;
603 }
604
605
606 int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
607
608 return getAction(thisServer, thisRegion, otherServer, otherRegion);
609 }
610
611 private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
612 int[] regionLocations = cluster.regionLocations[thisRegion];
613
614 if (regionLocations == null || regionLocations.length <= 1) {
615 return pickOtherRandomServer(cluster, thisServer);
616 }
617
618 for (int loc : regionLocations) {
619 if (loc >= 0 && loc != thisServer) {
620 return loc;
621 }
622 }
623
624
625 return pickOtherRandomServer(cluster, thisServer);
626 }
627
628 void setServices(MasterServices services) {
629 this.masterServices = services;
630 }
631 }
632
633
634
635
636
637 static class RegionReplicaCandidateGenerator extends CandidateGenerator {
638
639 RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
640
641
642
643
644
645
646
647
648
649
650 int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
651 , int[] regionIndexToPrimaryIndex) {
652 int currentPrimary = -1;
653 int currentPrimaryIndex = -1;
654 int selectedPrimaryIndex = -1;
655 double currentLargestRandom = -1;
656
657
658
659 for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
660 int primary = j < primariesOfRegionsPerGroup.length
661 ? primariesOfRegionsPerGroup[j] : -1;
662 if (primary != currentPrimary) {
663 int numReplicas = j - currentPrimaryIndex;
664 if (numReplicas > 1) {
665
666 double currentRandom = RANDOM.nextDouble();
667
668
669 if (currentRandom > currentLargestRandom) {
670 selectedPrimaryIndex = currentPrimary;
671 currentLargestRandom = currentRandom;
672 }
673 }
674 currentPrimary = primary;
675 currentPrimaryIndex = j;
676 }
677 }
678
679
680
681 for (int j = 0; j < regionsPerGroup.length; j++) {
682 int regionIndex = regionsPerGroup[j];
683 if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
684
685 if (selectedPrimaryIndex != regionIndex) {
686 return regionIndex;
687 }
688 }
689 }
690 return -1;
691 }
692
693 @Override
694 Cluster.Action generate(Cluster cluster) {
695 int serverIndex = pickRandomServer(cluster);
696 if (cluster.numServers <= 1 || serverIndex == -1) {
697 return Cluster.NullAction;
698 }
699
700 int regionIndex = selectCoHostedRegionPerGroup(
701 cluster.primariesOfRegionsPerServer[serverIndex],
702 cluster.regionsPerServer[serverIndex],
703 cluster.regionIndexToPrimaryIndex);
704
705
706 if (regionIndex == -1) {
707
708 return randomGenerator.generate(cluster);
709 }
710
711 int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
712 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
713 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
714 }
715 }
716
717
718
719
720
721 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
722 @Override
723 Cluster.Action generate(Cluster cluster) {
724 int rackIndex = pickRandomRack(cluster);
725 if (cluster.numRacks <= 1 || rackIndex == -1) {
726 return super.generate(cluster);
727 }
728
729 int regionIndex = selectCoHostedRegionPerGroup(
730 cluster.primariesOfRegionsPerRack[rackIndex],
731 cluster.regionsPerRack[rackIndex],
732 cluster.regionIndexToPrimaryIndex);
733
734
735 if (regionIndex == -1) {
736
737 return randomGenerator.generate(cluster);
738 }
739
740 int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
741 int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
742
743 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
744 int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
745 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
746 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
747 }
748 }
749
750
751
752
753 abstract static class CostFunction {
754
755 private float multiplier = 0;
756
757 protected Cluster cluster;
758
759 CostFunction(Configuration c) {
760
761 }
762
763 float getMultiplier() {
764 return multiplier;
765 }
766
767 void setMultiplier(float m) {
768 this.multiplier = m;
769 }
770
771
772
773
774 void init(Cluster cluster) {
775 this.cluster = cluster;
776 }
777
778
779
780
781
782 void postAction(Action action) {
783 switch (action.type) {
784 case NULL: break;
785 case ASSIGN_REGION:
786 AssignRegionAction ar = (AssignRegionAction) action;
787 regionMoved(ar.region, -1, ar.server);
788 break;
789 case MOVE_REGION:
790 MoveRegionAction mra = (MoveRegionAction) action;
791 regionMoved(mra.region, mra.fromServer, mra.toServer);
792 break;
793 case SWAP_REGIONS:
794 SwapRegionsAction a = (SwapRegionsAction) action;
795 regionMoved(a.fromRegion, a.fromServer, a.toServer);
796 regionMoved(a.toRegion, a.toServer, a.fromServer);
797 break;
798 default:
799 throw new RuntimeException("Uknown action:" + action.type);
800 }
801 }
802
803 protected void regionMoved(int region, int oldServer, int newServer) {
804 }
805
806 abstract double cost();
807
808
809
810
811
812
813
814
815
816 protected double costFromArray(double[] stats) {
817 double totalCost = 0;
818 double total = getSum(stats);
819
820 double count = stats.length;
821 double mean = total/count;
822
823
824
825 double max = ((count - 1) * mean) + (total - mean);
826
827
828 double min;
829 if (count > total) {
830 min = ((count - total) * mean) + ((1 - mean) * total);
831 } else {
832
833 int numHigh = (int) (total - (Math.floor(mean) * count));
834 int numLow = (int) (count - numHigh);
835
836 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
837
838 }
839 min = Math.max(0, min);
840 for (int i=0; i<stats.length; i++) {
841 double n = stats[i];
842 double diff = Math.abs(mean - n);
843 totalCost += diff;
844 }
845
846 double scaled = scale(min, max, totalCost);
847 return scaled;
848 }
849
850 private double getSum(double[] stats) {
851 double total = 0;
852 for(double s:stats) {
853 total += s;
854 }
855 return total;
856 }
857
858
859
860
861
862
863
864
865
866 protected double scale(double min, double max, double value) {
867 if (max <= min || value <= min) {
868 return 0;
869 }
870 if ((max - min) == 0) return 0;
871
872 return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
873 }
874 }
875
876
877
878
879
880 static class MoveCostFunction extends CostFunction {
881 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
882 private static final String MAX_MOVES_PERCENT_KEY =
883 "hbase.master.balancer.stochastic.maxMovePercent";
884 private static final float DEFAULT_MOVE_COST = 100;
885 private static final int DEFAULT_MAX_MOVES = 600;
886 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
887
888 private final float maxMovesPercent;
889
890 MoveCostFunction(Configuration conf) {
891 super(conf);
892
893
894
895 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
896
897 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
898 }
899
900 @Override
901 double cost() {
902
903 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
904 DEFAULT_MAX_MOVES);
905
906 double moveCost = cluster.numMovedRegions;
907
908
909
910 if (moveCost > maxMoves) {
911 return 1000000;
912 }
913
914 return scale(0, cluster.numRegions, moveCost);
915 }
916 }
917
918
919
920
921
922 static class RegionCountSkewCostFunction extends CostFunction {
923 private static final String REGION_COUNT_SKEW_COST_KEY =
924 "hbase.master.balancer.stochastic.regionCountCost";
925 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
926
927 private double[] stats = null;
928
929 RegionCountSkewCostFunction(Configuration conf) {
930 super(conf);
931
932 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
933 }
934
935 @Override
936 double cost() {
937 if (stats == null || stats.length != cluster.numServers) {
938 stats = new double[cluster.numServers];
939 }
940
941 for (int i =0; i < cluster.numServers; i++) {
942 stats[i] = cluster.regionsPerServer[i].length;
943 }
944
945 return costFromArray(stats);
946 }
947 }
948
949
950
951
952
953 static class TableSkewCostFunction extends CostFunction {
954
955 private static final String TABLE_SKEW_COST_KEY =
956 "hbase.master.balancer.stochastic.tableSkewCost";
957 private static final float DEFAULT_TABLE_SKEW_COST = 35;
958
959 TableSkewCostFunction(Configuration conf) {
960 super(conf);
961 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
962 }
963
964 @Override
965 double cost() {
966 double max = cluster.numRegions;
967 double min = ((double) cluster.numRegions) / cluster.numServers;
968 double value = 0;
969
970 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
971 value += cluster.numMaxRegionsPerTable[i];
972 }
973
974 return scale(min, max, value);
975 }
976 }
977
978
979
980
981
982 static class LocalityCostFunction extends CostFunction {
983
984 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
985 private static final float DEFAULT_LOCALITY_COST = 25;
986
987 private MasterServices services;
988
989 LocalityCostFunction(Configuration conf, MasterServices srv) {
990 super(conf);
991 this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
992 this.services = srv;
993 }
994
995 void setServices(MasterServices srvc) {
996 this.services = srvc;
997 }
998
999 @Override
1000 double cost() {
1001 double max = 0;
1002 double cost = 0;
1003
1004
1005 if (this.services == null) {
1006 return cost;
1007 }
1008
1009 for (int i = 0; i < cluster.regionLocations.length; i++) {
1010 max += 1;
1011 int serverIndex = cluster.regionIndexToServerIndex[i];
1012 int[] regionLocations = cluster.regionLocations[i];
1013
1014
1015
1016 if (regionLocations == null) {
1017 continue;
1018 }
1019
1020 int index = -1;
1021 for (int j = 0; j < regionLocations.length; j++) {
1022 if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1023 index = j;
1024 break;
1025 }
1026 }
1027
1028 if (index < 0) {
1029 if (regionLocations.length > 0) {
1030 cost += 1;
1031 }
1032 } else {
1033 cost += (double) index / (double) regionLocations.length;
1034 }
1035 }
1036 return scale(0, max, cost);
1037 }
1038 }
1039
1040
1041
1042
1043
1044 abstract static class CostFromRegionLoadFunction extends CostFunction {
1045
1046 private ClusterStatus clusterStatus = null;
1047 private Map<String, Deque<RegionLoad>> loads = null;
1048 private double[] stats = null;
1049 CostFromRegionLoadFunction(Configuration conf) {
1050 super(conf);
1051 }
1052
1053 void setClusterStatus(ClusterStatus status) {
1054 this.clusterStatus = status;
1055 }
1056
1057 void setLoads(Map<String, Deque<RegionLoad>> l) {
1058 this.loads = l;
1059 }
1060
1061 @Override
1062 double cost() {
1063 if (clusterStatus == null || loads == null) {
1064 return 0;
1065 }
1066
1067 if (stats == null || stats.length != cluster.numServers) {
1068 stats = new double[cluster.numServers];
1069 }
1070
1071 for (int i =0; i < stats.length; i++) {
1072
1073 long cost = 0;
1074
1075
1076 for(int regionIndex:cluster.regionsPerServer[i]) {
1077 Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
1078
1079
1080 if (regionLoadList != null) {
1081 cost += getRegionLoadCost(regionLoadList);
1082 }
1083 }
1084
1085
1086 stats[i] = cost;
1087 }
1088
1089
1090 return costFromArray(stats);
1091 }
1092
1093 protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1094 double cost = 0;
1095
1096 for (RegionLoad rl : regionLoadList) {
1097 double toAdd = getCostFromRl(rl);
1098
1099 if (cost == 0) {
1100 cost = toAdd;
1101 } else {
1102 cost = (.5 * cost) + (.5 * toAdd);
1103 }
1104 }
1105
1106 return cost;
1107 }
1108
1109 protected abstract double getCostFromRl(RegionLoad rl);
1110 }
1111
1112
1113
1114
1115
1116
1117 static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1118
1119 private static final String READ_REQUEST_COST_KEY =
1120 "hbase.master.balancer.stochastic.readRequestCost";
1121 private static final float DEFAULT_READ_REQUEST_COST = 5;
1122
1123 ReadRequestCostFunction(Configuration conf) {
1124 super(conf);
1125 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1126 }
1127
1128
1129 @Override
1130 protected double getCostFromRl(RegionLoad rl) {
1131 return rl.getReadRequestsCount();
1132 }
1133 }
1134
1135
1136
1137
1138
1139 static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1140
1141 private static final String WRITE_REQUEST_COST_KEY =
1142 "hbase.master.balancer.stochastic.writeRequestCost";
1143 private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1144
1145 WriteRequestCostFunction(Configuration conf) {
1146 super(conf);
1147 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1148 }
1149
1150 @Override
1151 protected double getCostFromRl(RegionLoad rl) {
1152 return rl.getWriteRequestsCount();
1153 }
1154 }
1155
1156
1157
1158
1159
1160
1161
1162 static class RegionReplicaHostCostFunction extends CostFunction {
1163 private static final String REGION_REPLICA_HOST_COST_KEY =
1164 "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1165 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1166
1167 long maxCost = 0;
1168 long[] costsPerGroup;
1169 int[][] primariesOfRegionsPerGroup;
1170
1171 public RegionReplicaHostCostFunction(Configuration conf) {
1172 super(conf);
1173 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1174 DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1175 }
1176
1177 @Override
1178 void init(Cluster cluster) {
1179 super.init(cluster);
1180
1181 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1182 costsPerGroup = new long[cluster.numHosts];
1183 primariesOfRegionsPerGroup = cluster.multiServersPerHost
1184 ? cluster.primariesOfRegionsPerHost
1185 : cluster.primariesOfRegionsPerServer;
1186 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1187 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1188 }
1189 }
1190
1191 long getMaxCost(Cluster cluster) {
1192 if (!cluster.hasRegionReplicas) {
1193 return 0;
1194 }
1195
1196 int[] primariesOfRegions = new int[cluster.numRegions];
1197 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1198 cluster.regions.length);
1199
1200 Arrays.sort(primariesOfRegions);
1201
1202
1203 return costPerGroup(primariesOfRegions);
1204 }
1205
1206 @Override
1207 double cost() {
1208 if (maxCost <= 0) {
1209 return 0;
1210 }
1211
1212 long totalCost = 0;
1213 for (int i = 0 ; i < costsPerGroup.length; i++) {
1214 totalCost += costsPerGroup[i];
1215 }
1216 return scale(0, maxCost, totalCost);
1217 }
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227 protected long costPerGroup(int[] primariesOfRegions) {
1228 long cost = 0;
1229 int currentPrimary = -1;
1230 int currentPrimaryIndex = -1;
1231
1232
1233 for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1234 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1235 if (primary != currentPrimary) {
1236 int numReplicas = j - currentPrimaryIndex;
1237
1238 if (numReplicas > 1) {
1239 cost += (numReplicas - 1) * (numReplicas - 1);
1240 }
1241 currentPrimary = primary;
1242 currentPrimaryIndex = j;
1243 }
1244 }
1245
1246 return cost;
1247 }
1248
1249 @Override
1250 protected void regionMoved(int region, int oldServer, int newServer) {
1251 if (maxCost <= 0) {
1252 return;
1253 }
1254 if (cluster.multiServersPerHost) {
1255 int oldHost = cluster.serverIndexToHostIndex[oldServer];
1256 int newHost = cluster.serverIndexToHostIndex[newServer];
1257 if (newHost != oldHost) {
1258 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1259 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1260 }
1261 } else {
1262 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1263 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1264 }
1265 }
1266 }
1267
1268
1269
1270
1271
1272
1273 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1274 private static final String REGION_REPLICA_RACK_COST_KEY =
1275 "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1276 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1277
1278 public RegionReplicaRackCostFunction(Configuration conf) {
1279 super(conf);
1280 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1281 }
1282
1283 @Override
1284 void init(Cluster cluster) {
1285 this.cluster = cluster;
1286 if (cluster.numRacks <= 1) {
1287 maxCost = 0;
1288 return;
1289 }
1290
1291 maxCost = getMaxCost(cluster);
1292 costsPerGroup = new long[cluster.numRacks];
1293 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1294 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1295 }
1296 }
1297
1298 @Override
1299 protected void regionMoved(int region, int oldServer, int newServer) {
1300 if (maxCost <= 0) {
1301 return;
1302 }
1303 int oldRack = cluster.serverIndexToRackIndex[oldServer];
1304 int newRack = cluster.serverIndexToRackIndex[newServer];
1305 if (newRack != oldRack) {
1306 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1307 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1308 }
1309 }
1310 }
1311
1312
1313
1314
1315
1316 static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1317
1318 private static final String MEMSTORE_SIZE_COST_KEY =
1319 "hbase.master.balancer.stochastic.memstoreSizeCost";
1320 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1321
1322 MemstoreSizeCostFunction(Configuration conf) {
1323 super(conf);
1324 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1325 }
1326
1327 @Override
1328 protected double getCostFromRl(RegionLoad rl) {
1329 return rl.getMemStoreSizeMB();
1330 }
1331 }
1332
1333
1334
1335
1336 static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1337
1338 private static final String STOREFILE_SIZE_COST_KEY =
1339 "hbase.master.balancer.stochastic.storefileSizeCost";
1340 private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1341
1342 StoreFileCostFunction(Configuration conf) {
1343 super(conf);
1344 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1345 }
1346
1347 @Override
1348 protected double getCostFromRl(RegionLoad rl) {
1349 return rl.getStorefileSizeMB();
1350 }
1351 }
1352 }