1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Random;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.master.RegionPlan;
37
38 import com.google.common.collect.MinMaxPriorityQueue;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
58 public class SimpleLoadBalancer extends BaseLoadBalancer {
59 private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
60 private static final Random RANDOM = new Random(System.currentTimeMillis());
61
62 private RegionInfoComparator riComparator = new RegionInfoComparator();
63 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
64
65
66
67
68
69
70
71
72
73
74 static class BalanceInfo {
75
76 private final int nextRegionForUnload;
77 private int numRegionsAdded;
78
79 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
80 this.nextRegionForUnload = nextRegionForUnload;
81 this.numRegionsAdded = numRegionsAdded;
82 }
83
84 int getNextRegionForUnload() {
85 return nextRegionForUnload;
86 }
87
88 int getNumRegionsAdded() {
89 return numRegionsAdded;
90 }
91
92 void setNumRegionsAdded(int numAdded) {
93 this.numRegionsAdded = numAdded;
94 }
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 @Override
183 public List<RegionPlan> balanceCluster(
184 Map<ServerName, List<HRegionInfo>> clusterMap) {
185 List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
186 if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
187 return regionsToReturn;
188 }
189 if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
190 if (clusterMap.size() <= 2) {
191 return null;
192 }
193 clusterMap = new HashMap<ServerName, List<HRegionInfo>>(clusterMap);
194 clusterMap.remove(masterServerName);
195 }
196
197 long startTime = System.currentTimeMillis();
198
199
200
201 Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
202 if (!this.needsBalance(c)) return null;
203
204 ClusterLoadState cs = new ClusterLoadState(clusterMap);
205 int numServers = cs.getNumServers();
206 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
207 int numRegions = cs.getNumRegions();
208 float average = cs.getLoadAverage();
209 int max = (int)Math.ceil(average);
210 int min = (int)average;
211
212
213 StringBuilder strBalanceParam = new StringBuilder();
214 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
215 .append(", numServers=").append(numServers).append(", max=").append(max)
216 .append(", min=").append(min);
217 LOG.debug(strBalanceParam.toString());
218
219
220
221 MinMaxPriorityQueue<RegionPlan> regionsToMove =
222 MinMaxPriorityQueue.orderedBy(rpComparator).create();
223 regionsToReturn = new ArrayList<RegionPlan>();
224
225
226 int serversOverloaded = 0;
227
228 boolean fetchFromTail = false;
229 Map<ServerName, BalanceInfo> serverBalanceInfo =
230 new TreeMap<ServerName, BalanceInfo>();
231 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
232 serversByLoad.descendingMap().entrySet()) {
233 ServerAndLoad sal = server.getKey();
234 int load = sal.getLoad();
235 if (load <= max) {
236 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
237 break;
238 }
239 serversOverloaded++;
240 List<HRegionInfo> regions = server.getValue();
241 int numToOffload = Math.min(load - max, regions.size());
242
243
244 Collections.sort(regions, riComparator);
245 int numTaken = 0;
246 for (int i = 0; i <= numToOffload; ) {
247 HRegionInfo hri = regions.get(i);
248 if (fetchFromTail) {
249 hri = regions.get(regions.size() - 1 - i);
250 }
251 i++;
252
253 if (shouldBeOnMaster(hri)
254 && masterServerName.equals(sal.getServerName())) continue;
255 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
256 numTaken++;
257 if (numTaken >= numToOffload) break;
258 }
259 serverBalanceInfo.put(sal.getServerName(),
260 new BalanceInfo(numToOffload, (-1)*numTaken));
261 }
262 int totalNumMoved = regionsToMove.size();
263
264
265 int neededRegions = 0;
266 fetchFromTail = false;
267
268 Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
269 int maxToTake = numRegions - min;
270 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
271 serversByLoad.entrySet()) {
272 if (maxToTake == 0) break;
273 int load = server.getKey().getLoad();
274 if (load >= min) {
275 continue;
276 }
277 int regionsToPut = min - load;
278 maxToTake -= regionsToPut;
279 underloadedServers.put(server.getKey().getServerName(), regionsToPut);
280 }
281
282 int serversUnderloaded = underloadedServers.size();
283 int incr = 1;
284 List<ServerName> sns =
285 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
286 Collections.shuffle(sns, RANDOM);
287 while (regionsToMove.size() > 0) {
288 int cnt = 0;
289 int i = incr > 0 ? 0 : underloadedServers.size()-1;
290 for (; i >= 0 && i < underloadedServers.size(); i += incr) {
291 if (regionsToMove.isEmpty()) break;
292 ServerName si = sns.get(i);
293 int numToTake = underloadedServers.get(si);
294 if (numToTake == 0) continue;
295
296 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
297
298 underloadedServers.put(si, numToTake-1);
299 cnt++;
300 BalanceInfo bi = serverBalanceInfo.get(si);
301 if (bi == null) {
302 bi = new BalanceInfo(0, 0);
303 serverBalanceInfo.put(si, bi);
304 }
305 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
306 }
307 if (cnt == 0) break;
308
309 incr = -incr;
310 }
311 for (Integer i : underloadedServers.values()) {
312
313 neededRegions += i;
314 }
315
316
317
318 if (neededRegions == 0 && regionsToMove.isEmpty()) {
319 long endTime = System.currentTimeMillis();
320 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
321 "Moving " + totalNumMoved + " regions off of " +
322 serversOverloaded + " overloaded servers onto " +
323 serversUnderloaded + " less loaded servers");
324 return regionsToReturn;
325 }
326
327
328
329
330
331 if (neededRegions != 0) {
332
333 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
334 serversByLoad.descendingMap().entrySet()) {
335 BalanceInfo balanceInfo =
336 serverBalanceInfo.get(server.getKey().getServerName());
337 int idx =
338 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
339 if (idx >= server.getValue().size()) break;
340 HRegionInfo region = server.getValue().get(idx);
341 if (region.isMetaRegion()) continue;
342 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
343 totalNumMoved++;
344 if (--neededRegions == 0) {
345
346 break;
347 }
348 }
349 }
350
351
352
353
354
355 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
356 serversByLoad.entrySet()) {
357 int regionCount = server.getKey().getLoad();
358 if (regionCount >= min) break;
359 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
360 if(balanceInfo != null) {
361 regionCount += balanceInfo.getNumRegionsAdded();
362 }
363 if(regionCount >= min) {
364 continue;
365 }
366 int numToTake = min - regionCount;
367 int numTaken = 0;
368 while(numTaken < numToTake && 0 < regionsToMove.size()) {
369 addRegionPlan(regionsToMove, fetchFromTail,
370 server.getKey().getServerName(), regionsToReturn);
371 numTaken++;
372 }
373 }
374
375
376 if (0 < regionsToMove.size()) {
377 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
378 serversByLoad.entrySet()) {
379 int regionCount = server.getKey().getLoad();
380 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
381 if(balanceInfo != null) {
382 regionCount += balanceInfo.getNumRegionsAdded();
383 }
384 if(regionCount >= max) {
385 break;
386 }
387 addRegionPlan(regionsToMove, fetchFromTail,
388 server.getKey().getServerName(), regionsToReturn);
389 if (regionsToMove.isEmpty()) {
390 break;
391 }
392 }
393 }
394
395 long endTime = System.currentTimeMillis();
396
397 if (!regionsToMove.isEmpty() || neededRegions != 0) {
398
399 LOG.warn("regionsToMove=" + totalNumMoved +
400 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
401 ", serversUnderloaded=" + serversUnderloaded);
402 StringBuilder sb = new StringBuilder();
403 for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
404 if (sb.length() > 0) sb.append(", ");
405 sb.append(e.getKey().toString());
406 sb.append(" ");
407 sb.append(e.getValue().size());
408 }
409 LOG.warn("Input " + sb.toString());
410 }
411
412
413 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
414 "Moving " + totalNumMoved + " regions off of " +
415 serversOverloaded + " overloaded servers onto " +
416 serversUnderloaded + " less loaded servers");
417
418 return regionsToReturn;
419 }
420
421
422
423
424 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
425 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
426 RegionPlan rp = null;
427 if (!fetchFromTail) rp = regionsToMove.remove();
428 else rp = regionsToMove.removeLast();
429 rp.setDestination(sn);
430 regionsToReturn.add(rp);
431 }
432 }