1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Random;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.master.RegionPlan;
37
38 import com.google.common.collect.MinMaxPriorityQueue;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
58 public class SimpleLoadBalancer extends BaseLoadBalancer {
59 private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
60 private static final Random RANDOM = new Random(System.currentTimeMillis());
61
62 private RegionInfoComparator riComparator = new RegionInfoComparator();
63 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
64
65
66
67
68
69
70
71
72
73
74 static class BalanceInfo {
75
76 private final int nextRegionForUnload;
77 private int numRegionsAdded;
78
79 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
80 this.nextRegionForUnload = nextRegionForUnload;
81 this.numRegionsAdded = numRegionsAdded;
82 }
83
84 int getNextRegionForUnload() {
85 return nextRegionForUnload;
86 }
87
88 int getNumRegionsAdded() {
89 return numRegionsAdded;
90 }
91
92 void setNumRegionsAdded(int numAdded) {
93 this.numRegionsAdded = numAdded;
94 }
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 @Override
183 public List<RegionPlan> balanceCluster(
184 Map<ServerName, List<HRegionInfo>> clusterMap) {
185 List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
186 if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
187 return regionsToReturn;
188 }
189 if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
190 if (clusterMap.size() <= 2) {
191 return null;
192 }
193 clusterMap = new HashMap<ServerName, List<HRegionInfo>>(clusterMap);
194 clusterMap.remove(masterServerName);
195 }
196
197 long startTime = System.currentTimeMillis();
198
199
200
201 Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
202 if (!this.needsBalance(c)) return null;
203
204 ClusterLoadState cs = new ClusterLoadState(clusterMap);
205 int numServers = cs.getNumServers();
206 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
207 int numRegions = cs.getNumRegions();
208 float average = cs.getLoadAverage();
209 int max = (int)Math.ceil(average);
210 int min = (int)average;
211
212
213 StringBuilder strBalanceParam = new StringBuilder();
214 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
215 .append(", numServers=").append(numServers).append(", max=").append(max)
216 .append(", min=").append(min);
217 LOG.debug(strBalanceParam.toString());
218
219
220
221 MinMaxPriorityQueue<RegionPlan> regionsToMove =
222 MinMaxPriorityQueue.orderedBy(rpComparator).create();
223 regionsToReturn = new ArrayList<RegionPlan>();
224
225
226 int serversOverloaded = 0;
227
228 boolean fetchFromTail = false;
229 Map<ServerName, BalanceInfo> serverBalanceInfo =
230 new TreeMap<ServerName, BalanceInfo>();
231 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
232 serversByLoad.descendingMap().entrySet()) {
233 ServerAndLoad sal = server.getKey();
234 int load = sal.getLoad();
235 if (load <= max) {
236 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
237 break;
238 }
239 serversOverloaded++;
240 List<HRegionInfo> regions = server.getValue();
241 int numToOffload = Math.min(load - max, regions.size());
242
243
244 Collections.sort(regions, riComparator);
245 int numTaken = 0;
246 for (int i = 0; i <= numToOffload; ) {
247 HRegionInfo hri = regions.get(i);
248 if (fetchFromTail) {
249 hri = regions.get(regions.size() - 1 - i);
250 }
251 i++;
252
253 if (shouldBeOnMaster(hri)
254 && masterServerName.equals(sal.getServerName())) continue;
255 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
256 numTaken++;
257 if (numTaken >= numToOffload) break;
258 }
259 serverBalanceInfo.put(sal.getServerName(),
260 new BalanceInfo(numToOffload, (-1)*numTaken));
261 }
262 int totalNumMoved = regionsToMove.size();
263
264
265 int neededRegions = 0;
266 fetchFromTail = false;
267
268 Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
269 int maxToTake = numRegions - min;
270 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
271 serversByLoad.entrySet()) {
272 if (maxToTake == 0) break;
273 int load = server.getKey().getLoad();
274 if (load >= min && load > 0) {
275 continue;
276 }
277 int regionsToPut = min - load;
278 if (regionsToPut == 0)
279 {
280 regionsToPut = 1;
281 }
282 maxToTake -= regionsToPut;
283 underloadedServers.put(server.getKey().getServerName(), regionsToPut);
284 }
285
286 int serversUnderloaded = underloadedServers.size();
287 int incr = 1;
288 List<ServerName> sns =
289 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
290 Collections.shuffle(sns, RANDOM);
291 while (regionsToMove.size() > 0) {
292 int cnt = 0;
293 int i = incr > 0 ? 0 : underloadedServers.size()-1;
294 for (; i >= 0 && i < underloadedServers.size(); i += incr) {
295 if (regionsToMove.isEmpty()) break;
296 ServerName si = sns.get(i);
297 int numToTake = underloadedServers.get(si);
298 if (numToTake == 0) continue;
299
300 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
301
302 underloadedServers.put(si, numToTake-1);
303 cnt++;
304 BalanceInfo bi = serverBalanceInfo.get(si);
305 if (bi == null) {
306 bi = new BalanceInfo(0, 0);
307 serverBalanceInfo.put(si, bi);
308 }
309 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
310 }
311 if (cnt == 0) break;
312
313 incr = -incr;
314 }
315 for (Integer i : underloadedServers.values()) {
316
317 neededRegions += i;
318 }
319
320
321
322 if (neededRegions == 0 && regionsToMove.isEmpty()) {
323 long endTime = System.currentTimeMillis();
324 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
325 "Moving " + totalNumMoved + " regions off of " +
326 serversOverloaded + " overloaded servers onto " +
327 serversUnderloaded + " less loaded servers");
328 return regionsToReturn;
329 }
330
331
332
333
334
335 if (neededRegions != 0) {
336
337 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
338 serversByLoad.descendingMap().entrySet()) {
339 BalanceInfo balanceInfo =
340 serverBalanceInfo.get(server.getKey().getServerName());
341 int idx =
342 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
343 if (idx >= server.getValue().size()) break;
344 HRegionInfo region = server.getValue().get(idx);
345 if (region.isMetaRegion()) continue;
346 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
347 totalNumMoved++;
348 if (--neededRegions == 0) {
349
350 break;
351 }
352 }
353 }
354
355
356
357
358
359 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
360 serversByLoad.entrySet()) {
361 int regionCount = server.getKey().getLoad();
362 if (regionCount >= min) break;
363 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
364 if(balanceInfo != null) {
365 regionCount += balanceInfo.getNumRegionsAdded();
366 }
367 if(regionCount >= min) {
368 continue;
369 }
370 int numToTake = min - regionCount;
371 int numTaken = 0;
372 while(numTaken < numToTake && 0 < regionsToMove.size()) {
373 addRegionPlan(regionsToMove, fetchFromTail,
374 server.getKey().getServerName(), regionsToReturn);
375 numTaken++;
376 }
377 }
378
379
380 if (0 < regionsToMove.size()) {
381 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
382 serversByLoad.entrySet()) {
383 int regionCount = server.getKey().getLoad();
384 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
385 if(balanceInfo != null) {
386 regionCount += balanceInfo.getNumRegionsAdded();
387 }
388 if(regionCount >= max) {
389 break;
390 }
391 addRegionPlan(regionsToMove, fetchFromTail,
392 server.getKey().getServerName(), regionsToReturn);
393 if (regionsToMove.isEmpty()) {
394 break;
395 }
396 }
397 }
398
399 long endTime = System.currentTimeMillis();
400
401 if (!regionsToMove.isEmpty() || neededRegions != 0) {
402
403 LOG.warn("regionsToMove=" + totalNumMoved +
404 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
405 ", serversUnderloaded=" + serversUnderloaded);
406 StringBuilder sb = new StringBuilder();
407 for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
408 if (sb.length() > 0) sb.append(", ");
409 sb.append(e.getKey().toString());
410 sb.append(" ");
411 sb.append(e.getValue().size());
412 }
413 LOG.warn("Input " + sb.toString());
414 }
415
416
417 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
418 "Moving " + totalNumMoved + " regions off of " +
419 serversOverloaded + " overloaded servers onto " +
420 serversUnderloaded + " less loaded servers");
421
422 return regionsToReturn;
423 }
424
425
426
427
428 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
429 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
430 RegionPlan rp = null;
431 if (!fetchFromTail) rp = regionsToMove.remove();
432 else rp = regionsToMove.removeLast();
433 rp.setDestination(sn);
434 regionsToReturn.add(rp);
435 }
436 }