1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.ClusterStatus;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.RegionLoad;
36 import org.apache.hadoop.hbase.ServerLoad;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.master.MasterServices;
39 import org.apache.hadoop.hbase.master.RegionPlan;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Private
87 public class StochasticLoadBalancer extends BaseLoadBalancer {
88
89 private static final String STOREFILE_SIZE_COST_KEY =
90 "hbase.master.balancer.stochastic.storefileSizeCost";
91 private static final String MEMSTORE_SIZE_COST_KEY =
92 "hbase.master.balancer.stochastic.memstoreSizeCost";
93 private static final String WRITE_REQUEST_COST_KEY =
94 "hbase.master.balancer.stochastic.writeRequestCost";
95 private static final String READ_REQUEST_COST_KEY =
96 "hbase.master.balancer.stochastic.readRequestCost";
97 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
98 private static final String TABLE_LOAD_COST_KEY =
99 "hbase.master.balancer.stochastic.tableLoadCost";
100 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
101 private static final String REGION_LOAD_COST_KEY =
102 "hbase.master.balancer.stochastic.regionLoadCost";
103 private static final String STEPS_PER_REGION_KEY =
104 "hbase.master.balancer.stochastic.stepsPerRegion";
105 private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
106 private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
107 private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime";
108 private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
109
110 private static final Random RANDOM = new Random(System.currentTimeMillis());
111 private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
112 private final RegionLocationFinder regionFinder = new RegionLocationFinder();
113 private ClusterStatus clusterStatus = null;
114 private Map<String, List<RegionLoad>> loads = new HashMap<String, List<RegionLoad>>();
115
116
117 private int maxSteps = 15000;
118 private int stepsPerRegion = 110;
119 private long maxRunningTime = 60 * 1000;
120 private int maxMoves = 600;
121 private int numRegionLoadsToRemember = 15;
122 private float loadMultiplier = 100;
123 private float moveCostMultiplier = 1;
124 private float tableMultiplier = 5;
125 private float localityMultiplier = 5;
126 private float readRequestMultiplier = 0;
127 private float writeRequestMultiplier = 0;
128 private float memStoreSizeMultiplier = 5;
129 private float storeFileSizeMultiplier = 5;
130
131
132 @Override
133 public void setConf(Configuration conf) {
134 super.setConf(conf);
135 regionFinder.setConf(conf);
136
137 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
138 maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
139 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
140 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
141
142 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
143
144
145 loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
146
147
148
149 moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier);
150
151
152
153 tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier);
154 localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
155 memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
156 storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
157 readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
158 writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
159 }
160
161 @Override
162 public void setClusterStatus(ClusterStatus st) {
163 super.setClusterStatus(st);
164 regionFinder.setClusterStatus(st);
165 this.clusterStatus = st;
166 updateRegionLoad();
167 }
168
169 @Override
170 public void setMasterServices(MasterServices masterServices) {
171 super.setMasterServices(masterServices);
172 this.services = masterServices;
173 this.regionFinder.setServices(masterServices);
174 }
175
176
177
178
179
180 @Override
181 public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
182
183 if (!needsBalance(new ClusterLoadState(clusterState))) {
184 return null;
185 }
186
187 long startTime = EnvironmentEdgeManager.currentTimeMillis();
188
189
190 double currentCost, newCost, initCost;
191
192 Cluster cluster = new Cluster(clusterState, loads, regionFinder);
193 currentCost = newCost = initCost = computeCost(cluster);
194
195 int computedMaxSteps =
196 Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion));
197
198 int step;
199 for (step = 0; step < computedMaxSteps; step++) {
200
201
202 for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) {
203
204
205 int rightServer = pickOtherServer(leftServer, cluster);
206 if (rightServer < 0) {
207 continue;
208 }
209
210
211
212 int lRegion = pickRandomRegion(cluster, leftServer, 0);
213 int rRegion = pickRandomRegion(cluster, rightServer, 0.5);
214
215
216 if (lRegion < 0 && rRegion < 0) {
217 continue;
218 }
219
220 cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion);
221
222 newCost = computeCost(cluster);
223
224 if (newCost < currentCost) {
225 currentCost = newCost;
226 } else {
227
228
229 cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion);
230 }
231 }
232 if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) {
233 break;
234 }
235 }
236
237 long endTime = EnvironmentEdgeManager.currentTimeMillis();
238
239 if (initCost > currentCost) {
240 List<RegionPlan> plans = createRegionPlans(cluster);
241
242 if (LOG.isDebugEnabled()) {
243 LOG.debug("Finished computing new load balance plan. Computation took "
244 + (endTime - startTime) + "ms to try " + step
245 + " different iterations. Found a solution that moves " + plans.size()
246 + " regions; Going from a computed cost of " + initCost + " to a new cost of "
247 + currentCost);
248 }
249 return plans;
250 }
251 if (LOG.isDebugEnabled()) {
252 LOG.debug("Could not find a better load balance plan. Tried " + step
253 + " different configurations in " + (endTime - startTime)
254 + "ms, and did not find anything with a computed cost less than " + initCost);
255 }
256 return null;
257 }
258
259
260
261
262
263
264
265
266 private List<RegionPlan> createRegionPlans(Cluster cluster) {
267 List<RegionPlan> plans = new LinkedList<RegionPlan>();
268
269 for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
270 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
271 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
272 if (initialServerIndex != newServerIndex) {
273 HRegionInfo region = cluster.regions[regionIndex];
274 ServerName initialServer = cluster.servers[initialServerIndex];
275 ServerName newServer = cluster.servers[newServerIndex];
276 if (LOG.isTraceEnabled()) {
277 LOG.trace("Moving Region " + region.getEncodedName() + " from server "
278 + initialServer.getHostname() + " to " + newServer.getHostname());
279 }
280 RegionPlan rp = new RegionPlan(region, initialServer, newServer);
281 plans.add(rp);
282 }
283 }
284 return plans;
285 }
286
287
288 private synchronized void updateRegionLoad() {
289
290
291
292 Map<String, List<RegionLoad>> oldLoads = loads;
293 loads = new HashMap<String, List<RegionLoad>>();
294
295 for (ServerName sn : clusterStatus.getServers()) {
296 ServerLoad sl = clusterStatus.getLoad(sn);
297 if (sl == null) continue;
298 for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
299 List<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
300 if (rLoads != null) {
301
302
303 if (rLoads.size() >= numRegionLoadsToRemember) {
304 int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
305
306 rLoads = rLoads.subList(numToRemove, rLoads.size());
307 }
308
309 } else {
310
311 rLoads = new ArrayList<RegionLoad>();
312 }
313 rLoads.add(entry.getValue());
314 loads.put(Bytes.toString(entry.getKey()), rLoads);
315
316 }
317 }
318 }
319
320
321
322
323
324
325
326
327
328
329
330
331
332 private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
333
334 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
335
336 return -1;
337 }
338 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
339 return cluster.regionsPerServer[server][rand];
340
341 }
342
343
344
345
346
347
348
349
350
351 private int pickOtherServer(int serverIndex, Cluster cluster) {
352 if (cluster.numServers < 2) {
353 return -1;
354 }
355 while (true) {
356 int otherServerIndex = RANDOM.nextInt(cluster.numServers);
357 if (otherServerIndex != serverIndex) {
358 return otherServerIndex;
359 }
360 }
361 }
362
363
364
365
366
367
368
369
370 protected double computeCost(Cluster cluster) {
371 double moveCost = (moveCostMultiplier > 0) ?
372 (moveCostMultiplier * computeMoveCost(cluster)) :
373 0;
374
375 double regionCountSkewCost = (loadMultiplier > 0) ?
376 (loadMultiplier * computeSkewLoadCost(cluster)) :
377 0;
378
379 double tableSkewCost = (tableMultiplier > 0) ?
380 (tableMultiplier * computeTableSkewLoadCost(cluster)) :
381 0;
382
383 double localityCost = (localityMultiplier > 0) ?
384 (localityMultiplier * computeDataLocalityCost(cluster)) :
385 0;
386
387 double memstoreSizeCost =
388 (memStoreSizeMultiplier > 0) ?
389 (memStoreSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE)) :
390 0;
391
392 double storefileSizeCost =
393 (storeFileSizeMultiplier > 0) ?
394 (storeFileSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE)):
395 0;
396
397 double readRequestCost =
398 (readRequestMultiplier > 0) ?
399 (readRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST)) :
400 0;
401
402 double writeRequestCost =
403 (writeRequestMultiplier > 0) ?
404 (writeRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST)) :
405 0;
406
407 double total =
408 moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
409 + storefileSizeCost + readRequestCost + writeRequestCost;
410 if (LOG.isTraceEnabled()) {
411 LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
412 + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
413 + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
414 + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
415 }
416 return total;
417 }
418
419
420
421
422
423
424
425
426 double computeMoveCost(Cluster cluster) {
427 double moveCost = cluster.numMovedRegions;
428
429
430
431 if (moveCost > maxMoves) {
432 return Double.MAX_VALUE;
433 }
434
435
436 if (cluster.numMovedMetaRegions > 0) {
437 maxMoves += 9 * cluster.numMovedMetaRegions;
438 }
439
440 return scale(0, cluster.numRegions, moveCost);
441 }
442
443
444
445
446
447
448
449
450 double computeSkewLoadCost(Cluster cluster) {
451 DescriptiveStatistics stats = new DescriptiveStatistics();
452 for (int[] regions : cluster.regionsPerServer) {
453 stats.addValue(regions.length);
454 }
455 return costFromStats(stats);
456 }
457
458
459
460
461
462
463
464
465 double computeTableSkewLoadCost(Cluster cluster) {
466 double max = cluster.numRegions;
467 double min = cluster.numRegions / cluster.numServers;
468 double value = 0;
469
470 for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) {
471 value += cluster.numMaxRegionsPerTable[i];
472 }
473
474 return scale(min, max, value);
475 }
476
477
478
479
480
481
482
483
484
485 double computeDataLocalityCost(Cluster cluster) {
486
487 double max = 0;
488 double cost = 0;
489
490
491 if (this.services == null) return cost;
492
493 for (int i = 0; i < cluster.regionLocations.length; i++) {
494 max += 1;
495 int serverIndex = cluster.regionIndexToServerIndex[i];
496 int[] regionLocations = cluster.regionLocations[i];
497
498
499
500 if (regionLocations == null) {
501 continue;
502 }
503
504 int index = -1;
505 for (int j = 0; j < regionLocations.length; j++) {
506 if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
507 index = j;
508 break;
509 }
510 }
511
512 if (index < 0) {
513 cost += 1;
514 } else {
515 cost += (double) index / (double) regionLocations.length;
516 }
517 }
518 return scale(0, max, cost);
519 }
520
521
522 private enum RegionLoadCostType {
523 READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE
524 }
525
526
527
528
529
530
531
532
533 private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) {
534
535 if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
536
537 DescriptiveStatistics stats = new DescriptiveStatistics();
538
539 for (List<RegionLoad> rl : cluster.regionLoads) {
540 long cost = 0;
541
542 if (rl != null) {
543 cost += getRegionLoadCost(rl, costType);
544 }
545
546
547 stats.addValue(cost);
548 }
549
550
551 return costFromStats(stats);
552 }
553
554
555
556
557
558
559
560
561 private double getRegionLoadCost(List<RegionLoad> regionLoadList, RegionLoadCostType type) {
562 double cost = 0;
563
564 int size = regionLoadList.size();
565 for(int i =0; i< size; i++) {
566 RegionLoad rl = regionLoadList.get(i);
567 double toAdd = 0;
568 switch (type) {
569 case READ_REQUEST:
570 toAdd = rl.getReadRequestsCount();
571 break;
572 case WRITE_REQUEST:
573 toAdd = rl.getWriteRequestsCount();
574 break;
575 case MEMSTORE_SIZE:
576 toAdd = rl.getMemStoreSizeMB();
577 break;
578 case STOREFILE_SIZE:
579 toAdd = rl.getStorefileSizeMB();
580 break;
581 default:
582 assert false : "RegionLoad cost type not supported.";
583 return 0;
584 }
585
586 if (cost == 0) {
587 cost = toAdd;
588 } else {
589 cost = (.5 * cost) + (.5 * toAdd);
590 }
591 }
592
593 return cost;
594
595 }
596
597
598
599
600
601
602
603
604
605 double costFromStats(DescriptiveStatistics stats) {
606 double totalCost = 0;
607 double mean = stats.getMean();
608
609
610
611
612 double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean);
613 for (double n : stats.getValues()) {
614 double diff = Math.abs(mean - n);
615 totalCost += diff;
616 }
617
618 return scale(0, max, totalCost);
619 }
620
621
622
623
624
625
626
627
628
629 private double scale(double min, double max, double value) {
630 if (max == 0 || value == 0) {
631 return 0;
632 }
633
634 return Math.max(0d, Math.min(1d, (value - min) / max));
635 }
636 }