1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import com.google.common.annotations.VisibleForTesting;
21 import java.util.ArrayDeque;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.Deque;
25 import java.util.HashMap;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Random;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.ClusterStatus;
37 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.RegionLoad;
40 import org.apache.hadoop.hbase.ServerLoad;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.master.MasterServices;
43 import org.apache.hadoop.hbase.master.RegionPlan;
44 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
45 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
46 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
47 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
48 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
96 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
97 justification="Complaint is about costFunctions not being synchronized; not end of the world")
98 public class StochasticLoadBalancer extends BaseLoadBalancer {
99
100 protected static final String STEPS_PER_REGION_KEY =
101 "hbase.master.balancer.stochastic.stepsPerRegion";
102 protected static final String MAX_STEPS_KEY =
103 "hbase.master.balancer.stochastic.maxSteps";
104 protected static final String MAX_RUNNING_TIME_KEY =
105 "hbase.master.balancer.stochastic.maxRunningTime";
106 protected static final String KEEP_REGION_LOADS =
107 "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
108
109 private static final Random RANDOM = new Random(System.currentTimeMillis());
110 private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
111
112 Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
113
114
115 private int maxSteps = 1000000;
116 private int stepsPerRegion = 800;
117 private long maxRunningTime = 30 * 1000 * 1;
118 private int numRegionLoadsToRemember = 15;
119
120 private CandidateGenerator[] candidateGenerators;
121 private CostFromRegionLoadFunction[] regionLoadFunctions;
122
123 private CostFunction[] costFunctions;
124
125
126
127 private LocalityBasedCandidateGenerator localityCandidateGenerator;
128 private LocalityCostFunction localityCost;
129 private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
130 private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
131
132 @Override
133 public void onConfigurationChange(Configuration conf) {
134 setConf(conf);
135 }
136
137 @Override
138 public synchronized void setConf(Configuration conf) {
139 super.setConf(conf);
140 LOG.info("loading config");
141
142 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
143
144 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
145 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
146
147 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
148
149 if (localityCandidateGenerator == null) {
150 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
151 }
152 localityCost = new LocalityCostFunction(conf, services);
153
154 if (candidateGenerators == null) {
155 candidateGenerators = new CandidateGenerator[] {
156 new RandomCandidateGenerator(),
157 new LoadCandidateGenerator(),
158 localityCandidateGenerator,
159 new RegionReplicaRackCandidateGenerator(),
160 };
161 }
162
163 regionLoadFunctions = new CostFromRegionLoadFunction[] {
164 new ReadRequestCostFunction(conf),
165 new WriteRequestCostFunction(conf),
166 new MemstoreSizeCostFunction(conf),
167 new StoreFileCostFunction(conf)
168 };
169
170 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
171 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
172
173 costFunctions = new CostFunction[]{
174 new RegionCountSkewCostFunction(conf),
175 new PrimaryRegionCountSkewCostFunction(conf),
176 new MoveCostFunction(conf),
177 localityCost,
178 new TableSkewCostFunction(conf),
179 regionReplicaHostCostFunction,
180 regionReplicaRackCostFunction,
181 regionLoadFunctions[0],
182 regionLoadFunctions[1],
183 regionLoadFunctions[2],
184 regionLoadFunctions[3],
185 };
186 }
187
188 @Override
189 protected void setSlop(Configuration conf) {
190 this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
191 }
192
193 @Override
194 public synchronized void setClusterStatus(ClusterStatus st) {
195 super.setClusterStatus(st);
196 updateRegionLoad();
197 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
198 cost.setClusterStatus(st);
199 }
200 }
201
202 @Override
203 public synchronized void setMasterServices(MasterServices masterServices) {
204 super.setMasterServices(masterServices);
205 this.localityCost.setServices(masterServices);
206 this.localityCandidateGenerator.setServices(masterServices);
207
208 }
209
210 @Override
211 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
212 regionReplicaHostCostFunction.init(c);
213 if (regionReplicaHostCostFunction.cost() > 0) return true;
214 regionReplicaRackCostFunction.init(c);
215 if (regionReplicaRackCostFunction.cost() > 0) return true;
216 return false;
217 }
218
219 @VisibleForTesting
220 Cluster.Action nextAction(Cluster cluster) {
221 return candidateGenerators[(RANDOM.nextInt(candidateGenerators.length))]
222 .generate(cluster);
223 }
224
225
226
227
228
229 @Override
230 public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
231 List<HRegionInfo>> clusterState) {
232 List<RegionPlan> plans = balanceMasterRegions(clusterState);
233 if (plans != null || clusterState == null || clusterState.size() <= 1) {
234 return plans;
235 }
236 if (masterServerName != null && clusterState.containsKey(masterServerName)) {
237 if (clusterState.size() <= 2) {
238 return null;
239 }
240 clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
241 clusterState.remove(masterServerName);
242 }
243
244
245
246
247
248 RegionLocationFinder finder = null;
249 if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
250 finder = this.regionFinder;
251 }
252
253
254
255
256 Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
257 if (!needsBalance(cluster)) {
258 return null;
259 }
260
261 long startTime = EnvironmentEdgeManager.currentTime();
262
263 initCosts(cluster);
264
265 double currentCost = computeCost(cluster, Double.MAX_VALUE);
266
267 double initCost = currentCost;
268 double newCost = currentCost;
269
270 long computedMaxSteps = Math.min(this.maxSteps,
271 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
272
273 long step;
274
275 for (step = 0; step < computedMaxSteps; step++) {
276 Cluster.Action action = nextAction(cluster);
277
278 if (action.type == Type.NULL) {
279 continue;
280 }
281
282 cluster.doAction(action);
283 updateCostsWithAction(cluster, action);
284
285 newCost = computeCost(cluster, currentCost);
286
287
288 if (newCost < currentCost) {
289 currentCost = newCost;
290 } else {
291
292
293 Action undoAction = action.undoAction();
294 cluster.doAction(undoAction);
295 updateCostsWithAction(cluster, undoAction);
296 }
297
298 if (EnvironmentEdgeManager.currentTime() - startTime >
299 maxRunningTime) {
300 break;
301 }
302 }
303 long endTime = EnvironmentEdgeManager.currentTime();
304
305 metricsBalancer.balanceCluster(endTime - startTime);
306
307 if (initCost > currentCost) {
308 plans = createRegionPlans(cluster);
309 if (LOG.isDebugEnabled()) {
310 LOG.debug("Finished computing new load balance plan. Computation took "
311 + (endTime - startTime) + "ms to try " + step
312 + " different iterations. Found a solution that moves "
313 + plans.size() + " regions; Going from a computed cost of "
314 + initCost + " to a new cost of " + currentCost);
315 }
316 return plans;
317 }
318 if (LOG.isDebugEnabled()) {
319 LOG.debug("Could not find a better load balance plan. Tried "
320 + step + " different configurations in " + (endTime - startTime)
321 + "ms, and did not find anything with a computed cost less than " + initCost);
322 }
323 return null;
324 }
325
326
327
328
329
330
331
332
333 private List<RegionPlan> createRegionPlans(Cluster cluster) {
334 List<RegionPlan> plans = new LinkedList<RegionPlan>();
335 for (int regionIndex = 0;
336 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
337 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
338 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
339
340 if (initialServerIndex != newServerIndex) {
341 HRegionInfo region = cluster.regions[regionIndex];
342 ServerName initialServer = cluster.servers[initialServerIndex];
343 ServerName newServer = cluster.servers[newServerIndex];
344
345 if (LOG.isTraceEnabled()) {
346 LOG.trace("Moving Region " + region.getEncodedName() + " from server "
347 + initialServer.getHostname() + " to " + newServer.getHostname());
348 }
349 RegionPlan rp = new RegionPlan(region, initialServer, newServer);
350 plans.add(rp);
351 }
352 }
353 return plans;
354 }
355
356
357
358
359 private synchronized void updateRegionLoad() {
360
361
362 Map<String, Deque<RegionLoad>> oldLoads = loads;
363 loads = new HashMap<String, Deque<RegionLoad>>();
364
365 for (ServerName sn : clusterStatus.getServers()) {
366 ServerLoad sl = clusterStatus.getLoad(sn);
367 if (sl == null) {
368 continue;
369 }
370 for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
371 Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
372 if (rLoads == null) {
373
374 rLoads = new ArrayDeque<RegionLoad>();
375 } else if (rLoads.size() >= numRegionLoadsToRemember) {
376 rLoads.remove();
377 }
378 rLoads.add(entry.getValue());
379 loads.put(Bytes.toString(entry.getKey()), rLoads);
380
381 }
382 }
383
384 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
385 cost.setLoads(loads);
386 }
387 }
388
389 protected void initCosts(Cluster cluster) {
390 for (CostFunction c:costFunctions) {
391 c.init(cluster);
392 }
393 }
394
395 protected void updateCostsWithAction(Cluster cluster, Action action) {
396 for (CostFunction c : costFunctions) {
397 c.postAction(action);
398 }
399 }
400
401
402
403
404
405
406
407
408
409
410 protected double computeCost(Cluster cluster, double previousCost) {
411 double total = 0;
412
413 for (CostFunction c:costFunctions) {
414 if (c.getMultiplier() <= 0) {
415 continue;
416 }
417
418 total += c.getMultiplier() * c.cost();
419
420 if (total > previousCost) {
421 return total;
422 }
423 }
424 return total;
425 }
426
427
428 abstract static class CandidateGenerator {
429 abstract Cluster.Action generate(Cluster cluster);
430
431
432
433
434
435
436
437
438
439
440
441
442
443 protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
444
445 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
446
447 return -1;
448 }
449 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
450 return cluster.regionsPerServer[server][rand];
451
452 }
453 protected int pickRandomServer(Cluster cluster) {
454 if (cluster.numServers < 1) {
455 return -1;
456 }
457
458 return RANDOM.nextInt(cluster.numServers);
459 }
460
461 protected int pickRandomRack(Cluster cluster) {
462 if (cluster.numRacks < 1) {
463 return -1;
464 }
465
466 return RANDOM.nextInt(cluster.numRacks);
467 }
468
469 protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
470 if (cluster.numServers < 2) {
471 return -1;
472 }
473 while (true) {
474 int otherServerIndex = pickRandomServer(cluster);
475 if (otherServerIndex != serverIndex) {
476 return otherServerIndex;
477 }
478 }
479 }
480
481 protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
482 if (cluster.numRacks < 2) {
483 return -1;
484 }
485 while (true) {
486 int otherRackIndex = pickRandomRack(cluster);
487 if (otherRackIndex != rackIndex) {
488 return otherRackIndex;
489 }
490 }
491 }
492
493 protected Cluster.Action pickRandomRegions(Cluster cluster,
494 int thisServer,
495 int otherServer) {
496 if (thisServer < 0 || otherServer < 0) {
497 return Cluster.NullAction;
498 }
499
500
501 int thisRegionCount = cluster.getNumRegions(thisServer);
502 int otherRegionCount = cluster.getNumRegions(otherServer);
503
504
505 double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
506 double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
507
508 int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
509 int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
510
511 return getAction(thisServer, thisRegion, otherServer, otherRegion);
512 }
513
514 protected Cluster.Action getAction (int fromServer, int fromRegion,
515 int toServer, int toRegion) {
516 if (fromServer < 0 || toServer < 0) {
517 return Cluster.NullAction;
518 }
519 if (fromRegion > 0 && toRegion > 0) {
520 return new Cluster.SwapRegionsAction(fromServer, fromRegion,
521 toServer, toRegion);
522 } else if (fromRegion > 0) {
523 return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
524 } else if (toRegion > 0) {
525 return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
526 } else {
527 return Cluster.NullAction;
528 }
529 }
530 }
531
532 static class RandomCandidateGenerator extends CandidateGenerator {
533
534 @Override
535 Cluster.Action generate(Cluster cluster) {
536
537 int thisServer = pickRandomServer(cluster);
538
539
540 int otherServer = pickOtherRandomServer(cluster, thisServer);
541
542 return pickRandomRegions(cluster, thisServer, otherServer);
543 }
544 }
545
546 static class LoadCandidateGenerator extends CandidateGenerator {
547
548 @Override
549 Cluster.Action generate(Cluster cluster) {
550 cluster.sortServersByRegionCount();
551 int thisServer = pickMostLoadedServer(cluster, -1);
552 int otherServer = pickLeastLoadedServer(cluster, thisServer);
553
554 return pickRandomRegions(cluster, thisServer, otherServer);
555 }
556
557 private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
558 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
559
560 int index = 0;
561 while (servers[index] == null || servers[index] == thisServer) {
562 index++;
563 if (index == servers.length) {
564 return -1;
565 }
566 }
567 return servers[index];
568 }
569
570 private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
571 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
572
573 int index = servers.length - 1;
574 while (servers[index] == null || servers[index] == thisServer) {
575 index--;
576 if (index < 0) {
577 return -1;
578 }
579 }
580 return servers[index];
581 }
582 }
583
584 static class LocalityBasedCandidateGenerator extends CandidateGenerator {
585
586 private MasterServices masterServices;
587
588 LocalityBasedCandidateGenerator(MasterServices masterServices) {
589 this.masterServices = masterServices;
590 }
591
592 @Override
593 Cluster.Action generate(Cluster cluster) {
594 if (this.masterServices == null) {
595 int thisServer = pickRandomServer(cluster);
596
597 int otherServer = pickOtherRandomServer(cluster, thisServer);
598 return pickRandomRegions(cluster, thisServer, otherServer);
599 }
600
601 cluster.calculateRegionServerLocalities();
602
603 int thisServer = pickLowestLocalityServer(cluster);
604 int thisRegion;
605 if (thisServer == -1) {
606 LOG.warn("Could not pick lowest locality region server");
607 return Cluster.NullAction;
608 } else {
609
610 thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
611 }
612
613 if (thisRegion == -1) {
614 return Cluster.NullAction;
615 }
616
617
618 int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
619
620 if (otherServer == -1) {
621 return Cluster.NullAction;
622 }
623
624
625 int otherRegion = -1;
626
627 return getAction(thisServer, thisRegion, otherServer, otherRegion);
628 }
629
630 private int pickLowestLocalityServer(Cluster cluster) {
631 return cluster.getLowestLocalityRegionServer();
632 }
633
634 private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
635 return cluster.getLowestLocalityRegionOnServer(server);
636 }
637
638 void setServices(MasterServices services) {
639 this.masterServices = services;
640 }
641 }
642
643
644
645
646
647 static class RegionReplicaCandidateGenerator extends CandidateGenerator {
648
649 RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
650
651
652
653
654
655
656
657
658
659
660 int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
661 , int[] regionIndexToPrimaryIndex) {
662 int currentPrimary = -1;
663 int currentPrimaryIndex = -1;
664 int selectedPrimaryIndex = -1;
665 double currentLargestRandom = -1;
666
667
668
669 for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
670 int primary = j < primariesOfRegionsPerGroup.length
671 ? primariesOfRegionsPerGroup[j] : -1;
672 if (primary != currentPrimary) {
673 int numReplicas = j - currentPrimaryIndex;
674 if (numReplicas > 1) {
675
676 double currentRandom = RANDOM.nextDouble();
677
678
679 if (currentRandom > currentLargestRandom) {
680 selectedPrimaryIndex = currentPrimary;
681 currentLargestRandom = currentRandom;
682 }
683 }
684 currentPrimary = primary;
685 currentPrimaryIndex = j;
686 }
687 }
688
689
690
691 for (int j = 0; j < regionsPerGroup.length; j++) {
692 int regionIndex = regionsPerGroup[j];
693 if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
694
695 if (selectedPrimaryIndex != regionIndex) {
696 return regionIndex;
697 }
698 }
699 }
700 return -1;
701 }
702
703 @Override
704 Cluster.Action generate(Cluster cluster) {
705 int serverIndex = pickRandomServer(cluster);
706 if (cluster.numServers <= 1 || serverIndex == -1) {
707 return Cluster.NullAction;
708 }
709
710 int regionIndex = selectCoHostedRegionPerGroup(
711 cluster.primariesOfRegionsPerServer[serverIndex],
712 cluster.regionsPerServer[serverIndex],
713 cluster.regionIndexToPrimaryIndex);
714
715
716 if (regionIndex == -1) {
717
718 return randomGenerator.generate(cluster);
719 }
720
721 int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
722 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
723 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
724 }
725 }
726
727
728
729
730
731 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
732 @Override
733 Cluster.Action generate(Cluster cluster) {
734 int rackIndex = pickRandomRack(cluster);
735 if (cluster.numRacks <= 1 || rackIndex == -1) {
736 return super.generate(cluster);
737 }
738
739 int regionIndex = selectCoHostedRegionPerGroup(
740 cluster.primariesOfRegionsPerRack[rackIndex],
741 cluster.regionsPerRack[rackIndex],
742 cluster.regionIndexToPrimaryIndex);
743
744
745 if (regionIndex == -1) {
746
747 return randomGenerator.generate(cluster);
748 }
749
750 int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
751 int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
752
753 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
754 int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
755 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
756 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
757 }
758 }
759
760
761
762
763 abstract static class CostFunction {
764
765 private float multiplier = 0;
766
767 protected Cluster cluster;
768
769 CostFunction(Configuration c) {
770
771 }
772
773 float getMultiplier() {
774 return multiplier;
775 }
776
777 void setMultiplier(float m) {
778 this.multiplier = m;
779 }
780
781
782
783
784 void init(Cluster cluster) {
785 this.cluster = cluster;
786 }
787
788
789
790
791
792 void postAction(Action action) {
793 switch (action.type) {
794 case NULL: break;
795 case ASSIGN_REGION:
796 AssignRegionAction ar = (AssignRegionAction) action;
797 regionMoved(ar.region, -1, ar.server);
798 break;
799 case MOVE_REGION:
800 MoveRegionAction mra = (MoveRegionAction) action;
801 regionMoved(mra.region, mra.fromServer, mra.toServer);
802 break;
803 case SWAP_REGIONS:
804 SwapRegionsAction a = (SwapRegionsAction) action;
805 regionMoved(a.fromRegion, a.fromServer, a.toServer);
806 regionMoved(a.toRegion, a.toServer, a.fromServer);
807 break;
808 default:
809 throw new RuntimeException("Uknown action:" + action.type);
810 }
811 }
812
813 protected void regionMoved(int region, int oldServer, int newServer) {
814 }
815
816 abstract double cost();
817
818
819
820
821
822
823
824
825
826 protected double costFromArray(double[] stats) {
827 double totalCost = 0;
828 double total = getSum(stats);
829
830 double count = stats.length;
831 double mean = total/count;
832
833
834
835 double max = ((count - 1) * mean) + (total - mean);
836
837
838 double min;
839 if (count > total) {
840 min = ((count - total) * mean) + ((1 - mean) * total);
841 } else {
842
843 int numHigh = (int) (total - (Math.floor(mean) * count));
844 int numLow = (int) (count - numHigh);
845
846 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
847
848 }
849 min = Math.max(0, min);
850 for (int i=0; i<stats.length; i++) {
851 double n = stats[i];
852 double diff = Math.abs(mean - n);
853 totalCost += diff;
854 }
855
856 double scaled = scale(min, max, totalCost);
857 return scaled;
858 }
859
860 private double getSum(double[] stats) {
861 double total = 0;
862 for(double s:stats) {
863 total += s;
864 }
865 return total;
866 }
867
868
869
870
871
872
873
874
875
876 protected double scale(double min, double max, double value) {
877 if (max <= min || value <= min) {
878 return 0;
879 }
880 if ((max - min) == 0) return 0;
881
882 return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
883 }
884 }
885
886
887
888
889
890 static class MoveCostFunction extends CostFunction {
891 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
892 private static final String MAX_MOVES_PERCENT_KEY =
893 "hbase.master.balancer.stochastic.maxMovePercent";
894 private static final float DEFAULT_MOVE_COST = 100;
895 private static final int DEFAULT_MAX_MOVES = 600;
896 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
897
898 private final float maxMovesPercent;
899
900 MoveCostFunction(Configuration conf) {
901 super(conf);
902
903
904
905 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
906
907 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
908 }
909
910 @Override
911 double cost() {
912
913 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
914 DEFAULT_MAX_MOVES);
915
916 double moveCost = cluster.numMovedRegions;
917
918
919
920 if (moveCost > maxMoves) {
921 return 1000000;
922 }
923
924 return scale(0, cluster.numRegions, moveCost);
925 }
926 }
927
928
929
930
931
932 static class RegionCountSkewCostFunction extends CostFunction {
933 private static final String REGION_COUNT_SKEW_COST_KEY =
934 "hbase.master.balancer.stochastic.regionCountCost";
935 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
936
937 private double[] stats = null;
938
939 RegionCountSkewCostFunction(Configuration conf) {
940 super(conf);
941
942 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
943 }
944
945 @Override
946 double cost() {
947 if (stats == null || stats.length != cluster.numServers) {
948 stats = new double[cluster.numServers];
949 }
950
951 for (int i =0; i < cluster.numServers; i++) {
952 stats[i] = cluster.regionsPerServer[i].length;
953 }
954
955 return costFromArray(stats);
956 }
957 }
958
959
960
961
962
963 static class PrimaryRegionCountSkewCostFunction extends CostFunction {
964 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
965 "hbase.master.balancer.stochastic.primaryRegionCountCost";
966 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
967
968 private double[] stats = null;
969
970 PrimaryRegionCountSkewCostFunction(Configuration conf) {
971 super(conf);
972
973 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
974 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
975 }
976
977 @Override
978 double cost() {
979 if (!cluster.hasRegionReplicas) {
980 return 0;
981 }
982 if (stats == null || stats.length != cluster.numServers) {
983 stats = new double[cluster.numServers];
984 }
985
986 for (int i =0; i < cluster.numServers; i++) {
987 stats[i] = 0;
988 for (int regionIdx : cluster.regionsPerServer[i]) {
989 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
990 stats[i] ++;
991 }
992 }
993 }
994
995 return costFromArray(stats);
996 }
997 }
998
999
1000
1001
1002
1003 static class TableSkewCostFunction extends CostFunction {
1004
1005 private static final String TABLE_SKEW_COST_KEY =
1006 "hbase.master.balancer.stochastic.tableSkewCost";
1007 private static final float DEFAULT_TABLE_SKEW_COST = 35;
1008
1009 TableSkewCostFunction(Configuration conf) {
1010 super(conf);
1011 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1012 }
1013
1014 @Override
1015 double cost() {
1016 double max = cluster.numRegions;
1017 double min = ((double) cluster.numRegions) / cluster.numServers;
1018 double value = 0;
1019
1020 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1021 value += cluster.numMaxRegionsPerTable[i];
1022 }
1023
1024 return scale(min, max, value);
1025 }
1026 }
1027
1028
1029
1030
1031
1032 static class LocalityCostFunction extends CostFunction {
1033
1034 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1035 private static final float DEFAULT_LOCALITY_COST = 25;
1036
1037 private MasterServices services;
1038
1039 LocalityCostFunction(Configuration conf, MasterServices srv) {
1040 super(conf);
1041 this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1042 this.services = srv;
1043 }
1044
1045 void setServices(MasterServices srvc) {
1046 this.services = srvc;
1047 }
1048
1049 @Override
1050 double cost() {
1051 double max = 0;
1052 double cost = 0;
1053
1054
1055 if (this.services == null) {
1056 return cost;
1057 }
1058
1059 for (int i = 0; i < cluster.regionLocations.length; i++) {
1060 max += 1;
1061 int serverIndex = cluster.regionIndexToServerIndex[i];
1062 int[] regionLocations = cluster.regionLocations[i];
1063
1064
1065
1066 if (regionLocations == null) {
1067 continue;
1068 }
1069
1070 int index = -1;
1071 for (int j = 0; j < regionLocations.length; j++) {
1072 if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1073 index = j;
1074 break;
1075 }
1076 }
1077
1078 if (index < 0) {
1079 cost += 1;
1080 } else {
1081 cost += (1 - cluster.getLocalityOfRegion(i, index));
1082 }
1083 }
1084 return scale(0, max, cost);
1085 }
1086 }
1087
1088
1089
1090
1091
1092 abstract static class CostFromRegionLoadFunction extends CostFunction {
1093
1094 private ClusterStatus clusterStatus = null;
1095 private Map<String, Deque<RegionLoad>> loads = null;
1096 private double[] stats = null;
1097 CostFromRegionLoadFunction(Configuration conf) {
1098 super(conf);
1099 }
1100
1101 void setClusterStatus(ClusterStatus status) {
1102 this.clusterStatus = status;
1103 }
1104
1105 void setLoads(Map<String, Deque<RegionLoad>> l) {
1106 this.loads = l;
1107 }
1108
1109 @Override
1110 double cost() {
1111 if (clusterStatus == null || loads == null) {
1112 return 0;
1113 }
1114
1115 if (stats == null || stats.length != cluster.numServers) {
1116 stats = new double[cluster.numServers];
1117 }
1118
1119 for (int i =0; i < stats.length; i++) {
1120
1121 long cost = 0;
1122
1123
1124 for(int regionIndex:cluster.regionsPerServer[i]) {
1125 Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
1126
1127
1128 if (regionLoadList != null) {
1129 cost += getRegionLoadCost(regionLoadList);
1130 }
1131 }
1132
1133
1134 stats[i] = cost;
1135 }
1136
1137
1138 return costFromArray(stats);
1139 }
1140
1141 protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1142 double cost = 0;
1143
1144 for (RegionLoad rl : regionLoadList) {
1145 double toAdd = getCostFromRl(rl);
1146
1147 if (cost == 0) {
1148 cost = toAdd;
1149 } else {
1150 cost = (.5 * cost) + (.5 * toAdd);
1151 }
1152 }
1153
1154 return cost;
1155 }
1156
1157 protected abstract double getCostFromRl(RegionLoad rl);
1158 }
1159
1160
1161
1162
1163
1164
1165 static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1166
1167 private static final String READ_REQUEST_COST_KEY =
1168 "hbase.master.balancer.stochastic.readRequestCost";
1169 private static final float DEFAULT_READ_REQUEST_COST = 5;
1170
1171 ReadRequestCostFunction(Configuration conf) {
1172 super(conf);
1173 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1174 }
1175
1176
1177 @Override
1178 protected double getCostFromRl(RegionLoad rl) {
1179 return rl.getReadRequestsCount();
1180 }
1181 }
1182
1183
1184
1185
1186
1187 static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1188
1189 private static final String WRITE_REQUEST_COST_KEY =
1190 "hbase.master.balancer.stochastic.writeRequestCost";
1191 private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1192
1193 WriteRequestCostFunction(Configuration conf) {
1194 super(conf);
1195 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1196 }
1197
1198 @Override
1199 protected double getCostFromRl(RegionLoad rl) {
1200 return rl.getWriteRequestsCount();
1201 }
1202 }
1203
1204
1205
1206
1207
1208
1209
1210 static class RegionReplicaHostCostFunction extends CostFunction {
1211 private static final String REGION_REPLICA_HOST_COST_KEY =
1212 "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1213 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1214
1215 long maxCost = 0;
1216 long[] costsPerGroup;
1217 int[][] primariesOfRegionsPerGroup;
1218
1219 public RegionReplicaHostCostFunction(Configuration conf) {
1220 super(conf);
1221 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1222 DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1223 }
1224
1225 @Override
1226 void init(Cluster cluster) {
1227 super.init(cluster);
1228
1229 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1230 costsPerGroup = new long[cluster.numHosts];
1231 primariesOfRegionsPerGroup = cluster.multiServersPerHost
1232 ? cluster.primariesOfRegionsPerHost
1233 : cluster.primariesOfRegionsPerServer;
1234 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1235 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1236 }
1237 }
1238
1239 long getMaxCost(Cluster cluster) {
1240 if (!cluster.hasRegionReplicas) {
1241 return 0;
1242 }
1243
1244 int[] primariesOfRegions = new int[cluster.numRegions];
1245 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1246 cluster.regions.length);
1247
1248 Arrays.sort(primariesOfRegions);
1249
1250
1251 return costPerGroup(primariesOfRegions);
1252 }
1253
1254 @Override
1255 double cost() {
1256 if (maxCost <= 0) {
1257 return 0;
1258 }
1259
1260 long totalCost = 0;
1261 for (int i = 0 ; i < costsPerGroup.length; i++) {
1262 totalCost += costsPerGroup[i];
1263 }
1264 return scale(0, maxCost, totalCost);
1265 }
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275 protected long costPerGroup(int[] primariesOfRegions) {
1276 long cost = 0;
1277 int currentPrimary = -1;
1278 int currentPrimaryIndex = -1;
1279
1280
1281 for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1282 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1283 if (primary != currentPrimary) {
1284 int numReplicas = j - currentPrimaryIndex;
1285
1286 if (numReplicas > 1) {
1287 cost += (numReplicas - 1) * (numReplicas - 1);
1288 }
1289 currentPrimary = primary;
1290 currentPrimaryIndex = j;
1291 }
1292 }
1293
1294 return cost;
1295 }
1296
1297 @Override
1298 protected void regionMoved(int region, int oldServer, int newServer) {
1299 if (maxCost <= 0) {
1300 return;
1301 }
1302 if (cluster.multiServersPerHost) {
1303 int oldHost = cluster.serverIndexToHostIndex[oldServer];
1304 int newHost = cluster.serverIndexToHostIndex[newServer];
1305 if (newHost != oldHost) {
1306 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1307 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1308 }
1309 } else {
1310 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1311 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1312 }
1313 }
1314 }
1315
1316
1317
1318
1319
1320
1321 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1322 private static final String REGION_REPLICA_RACK_COST_KEY =
1323 "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1324 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1325
1326 public RegionReplicaRackCostFunction(Configuration conf) {
1327 super(conf);
1328 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1329 }
1330
1331 @Override
1332 void init(Cluster cluster) {
1333 this.cluster = cluster;
1334 if (cluster.numRacks <= 1) {
1335 maxCost = 0;
1336 return;
1337 }
1338
1339 maxCost = getMaxCost(cluster);
1340 costsPerGroup = new long[cluster.numRacks];
1341 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1342 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1343 }
1344 }
1345
1346 @Override
1347 protected void regionMoved(int region, int oldServer, int newServer) {
1348 if (maxCost <= 0) {
1349 return;
1350 }
1351 int oldRack = cluster.serverIndexToRackIndex[oldServer];
1352 int newRack = cluster.serverIndexToRackIndex[newServer];
1353 if (newRack != oldRack) {
1354 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1355 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1356 }
1357 }
1358 }
1359
1360
1361
1362
1363
1364 static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1365
1366 private static final String MEMSTORE_SIZE_COST_KEY =
1367 "hbase.master.balancer.stochastic.memstoreSizeCost";
1368 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1369
1370 MemstoreSizeCostFunction(Configuration conf) {
1371 super(conf);
1372 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1373 }
1374
1375 @Override
1376 protected double getCostFromRl(RegionLoad rl) {
1377 return rl.getMemStoreSizeMB();
1378 }
1379 }
1380
1381
1382
1383
1384 static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1385
1386 private static final String STOREFILE_SIZE_COST_KEY =
1387 "hbase.master.balancer.stochastic.storefileSizeCost";
1388 private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1389
1390 StoreFileCostFunction(Configuration conf) {
1391 super(conf);
1392 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1393 }
1394
1395 @Override
1396 protected double getCostFromRl(RegionLoad rl) {
1397 return rl.getStorefileSizeMB();
1398 }
1399 }
1400 }