001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.NavigableMap; 028import java.util.Random; 029import java.util.TreeMap; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.master.RegionPlan; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 042 043/** 044 * Makes decisions about the placement and movement of Regions across 045 * RegionServers. 046 * 047 * <p>Cluster-wide load balancing will occur only when there are no regions in 048 * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. 049 * 050 * <p>On cluster startup, bulk assignment can be used to determine 051 * locations for all Regions in a cluster. 052 * 053 * <p>This classes produces plans for the 054 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public class SimpleLoadBalancer extends BaseLoadBalancer { 058 private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class); 059 private static final Random RANDOM = new Random(System.currentTimeMillis()); 060 061 private RegionInfoComparator riComparator = new RegionInfoComparator(); 062 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); 063 private float avgLoadOverall; 064 private List<ServerAndLoad> serverLoadList = new ArrayList<>(); 065 066 /** 067 * Stores additional per-server information about the regions added/removed 068 * during the run of the balancing algorithm. 069 * 070 * For servers that shed regions, we need to track which regions we have already 071 * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on 072 * the server that is the next to be shed. 073 */ 074 static class BalanceInfo { 075 076 private int nextRegionForUnload; 077 private int numRegionsAdded; 078 private List<RegionInfo> hriList; 079 080 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) { 081 this.nextRegionForUnload = nextRegionForUnload; 082 this.numRegionsAdded = numRegionsAdded; 083 this.hriList = hriList; 084 } 085 086 int getNextRegionForUnload() { 087 return nextRegionForUnload; 088 } 089 090 int getNumRegionsAdded() { 091 return numRegionsAdded; 092 } 093 094 void setNumRegionsAdded(int numAdded) { 095 this.numRegionsAdded = numAdded; 096 } 097 098 List<RegionInfo> getHriList() { 099 return hriList; 100 } 101 102 void setNextRegionForUnload(int nextRegionForUnload) { 103 this.nextRegionForUnload = nextRegionForUnload; 104 } 105 106 } 107 108 /** 109 * Pass RegionStates and allow balancer to set the current cluster load. 110 */ 111 void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 112 serverLoadList.clear(); 113 Map<ServerName, Integer> server2LoadMap = new HashMap<>(); 114 float sum = 0; 115 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad 116 .entrySet()) { 117 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) { 118 if (entry.getKey().equals(masterServerName)) { 119 continue; // we shouldn't include master as potential assignee 120 } 121 int regionNum = entry.getValue().size(); 122 server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v); 123 sum += regionNum; 124 } 125 } 126 server2LoadMap.forEach((k, v) -> { 127 serverLoadList.add(new ServerAndLoad(k, v)); 128 }); 129 avgLoadOverall = sum / serverLoadList.size(); 130 } 131 132 @Override 133 public void onConfigurationChange(Configuration conf) { 134 float originSlop = slop; 135 float originOverallSlop = overallSlop; 136 super.setConf(conf); 137 LOG.info("Update configuration of SimpleLoadBalancer, previous slop is " 138 + originSlop + ", current slop is " + slop + "previous overallSlop is" + 139 originOverallSlop + ", current overallSlop is " + originOverallSlop); 140 } 141 142 private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){ 143 ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange); 144 slList.set(i, newsl); 145 } 146 147 /** 148 * A checker function to decide when we want balance overall and certain table has been balanced, 149 * do we still need to re-distribute regions of this table to achieve the state of overall-balance 150 * @return true if this table should be balanced. 151 */ 152 private boolean overallNeedsBalance() { 153 int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop)); 154 int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop)); 155 int max = 0, min = Integer.MAX_VALUE; 156 for(ServerAndLoad server : serverLoadList){ 157 max = Math.max(server.getLoad(), max); 158 min = Math.min(server.getLoad(), min); 159 } 160 if (max <= ceiling && min >= floor) { 161 if (LOG.isTraceEnabled()) { 162 // If nothing to balance, then don't say anything unless trace-level logging. 163 LOG.trace("Skipping load balancing because cluster is balanced at overall level"); 164 } 165 return false; 166 } 167 return true; 168 } 169 170 /** 171 * Generate a global load balancing plan according to the specified map of 172 * server information to the most loaded regions of each server. 173 * 174 * The load balancing invariant is that all servers are within 1 region of the 175 * average number of regions per server. If the average is an integer number, 176 * all servers will be balanced to the average. Otherwise, all servers will 177 * have either floor(average) or ceiling(average) regions. 178 * 179 * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that 180 * we can fetch from both ends of the queue. 181 * At the beginning, we check whether there was empty region server 182 * just discovered by Master. If so, we alternately choose new / old 183 * regions from head / tail of regionsToMove, respectively. This alternation 184 * avoids clustering young regions on the newly discovered region server. 185 * Otherwise, we choose new regions from head of regionsToMove. 186 * 187 * Another improvement from HBASE-3609 is that we assign regions from 188 * regionsToMove to underloaded servers in round-robin fashion. 189 * Previously one underloaded server would be filled before we move onto 190 * the next underloaded server, leading to clustering of young regions. 191 * 192 * Finally, we randomly shuffle underloaded servers so that they receive 193 * offloaded regions relatively evenly across calls to balanceCluster(). 194 * 195 * The algorithm is currently implemented as such: 196 * 197 * <ol> 198 * <li>Determine the two valid numbers of regions each server should have, 199 * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average). 200 * 201 * <li>Iterate down the most loaded servers, shedding regions from each so 202 * each server hosts exactly <b>MAX</b> regions. Stop once you reach a 203 * server that already has <= <b>MAX</b> regions. 204 * <p> 205 * Order the regions to move from most recent to least. 206 * 207 * <li>Iterate down the least loaded servers, assigning regions so each server 208 * has exactly <b>MIN</b> regions. Stop once you reach a server that 209 * already has >= <b>MIN</b> regions. 210 * 211 * Regions being assigned to underloaded servers are those that were shed 212 * in the previous step. It is possible that there were not enough 213 * regions shed to fill each underloaded server to <b>MIN</b>. If so we 214 * end up with a number of regions required to do so, <b>neededRegions</b>. 215 * 216 * It is also possible that we were able to fill each underloaded but ended 217 * up with regions that were unassigned from overloaded servers but that 218 * still do not have assignment. 219 * 220 * If neither of these conditions hold (no regions needed to fill the 221 * underloaded servers, no regions leftover from overloaded servers), 222 * we are done and return. Otherwise we handle these cases below. 223 * 224 * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers), 225 * we iterate the most loaded servers again, shedding a single server from 226 * each (this brings them from having <b>MAX</b> regions to having 227 * <b>MIN</b> regions). 228 * 229 * <li>We now definitely have more regions that need assignment, either from 230 * the previous step or from the original shedding from overloaded servers. 231 * Iterate the least loaded servers filling each to <b>MIN</b>. 232 * 233 * <li>If we still have more regions that need assignment, again iterate the 234 * least loaded servers, this time giving each one (filling them to 235 * <b>MAX</b>) until we run out. 236 * 237 * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions. 238 * 239 * In addition, any server hosting >= <b>MAX</b> regions is guaranteed 240 * to end up with <b>MAX</b> regions at the end of the balancing. This 241 * ensures the minimal number of regions possible are moved. 242 * </ol> 243 * 244 * TODO: We can at-most reassign the number of regions away from a particular 245 * server to be how many they report as most loaded. 246 * Should we just keep all assignment in memory? Any objections? 247 * Does this mean we need HeapSize on HMaster? Or just careful monitor? 248 * (current thinking is we will hold all assignments in memory) 249 * 250 * @param loadOfOneTable Map of regionservers and their load/region information to 251 * a list of their most loaded regions 252 * @return a list of regions to be moved, including source and destination, 253 * or null if cluster is already balanced 254 */ 255 @Override 256 public List<RegionPlan> balanceTable(TableName tableName, 257 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 258 List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable); 259 if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 260 return regionsToReturn; 261 } 262 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 263 if (loadOfOneTable.size() <= 2) { 264 return null; 265 } 266 loadOfOneTable = new HashMap<>(loadOfOneTable); 267 loadOfOneTable.remove(masterServerName); 268 } 269 270 long startTime = System.currentTimeMillis(); 271 272 // construct a Cluster object with clusterMap and rest of the 273 // argument as defaults 274 Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager); 275 if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) { 276 return null; 277 } 278 ClusterLoadState cs = new ClusterLoadState(loadOfOneTable); 279 int numServers = cs.getNumServers(); 280 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 281 int numRegions = cs.getNumRegions(); 282 float average = cs.getLoadAverage(); 283 int max = (int)Math.ceil(average); 284 int min = (int)average; 285 286 // Using to check balance result. 287 StringBuilder strBalanceParam = new StringBuilder(); 288 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) 289 .append(", numServers=").append(numServers).append(", max=").append(max) 290 .append(", min=").append(min); 291 LOG.debug(strBalanceParam.toString()); 292 293 // Balance the cluster 294 // TODO: Look at data block locality or a more complex load to do this 295 MinMaxPriorityQueue<RegionPlan> regionsToMove = 296 MinMaxPriorityQueue.orderedBy(rpComparator).create(); 297 regionsToReturn = new ArrayList<>(); 298 299 // Walk down most loaded, pruning each to the max 300 int serversOverloaded = 0; 301 // flag used to fetch regions from head and tail of list, alternately 302 boolean fetchFromTail = false; 303 Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>(); 304 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server: 305 serversByLoad.descendingMap().entrySet()) { 306 ServerAndLoad sal = server.getKey(); 307 int load = sal.getLoad(); 308 if (load <= max) { 309 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue())); 310 continue; 311 } 312 serversOverloaded++; 313 List<RegionInfo> regions = server.getValue(); 314 int numToOffload = Math.min(load - max, regions.size()); 315 // account for the out-of-band regions which were assigned to this server 316 // after some other region server crashed 317 Collections.sort(regions, riComparator); 318 int numTaken = 0; 319 for (int i = 0; i <= numToOffload; ) { 320 RegionInfo hri = regions.get(i); // fetch from head 321 if (fetchFromTail) { 322 hri = regions.get(regions.size() - 1 - i); 323 } 324 i++; 325 // Don't rebalance special regions. 326 if (shouldBeOnMaster(hri) 327 && masterServerName.equals(sal.getServerName())) continue; 328 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); 329 numTaken++; 330 if (numTaken >= numToOffload) break; 331 } 332 serverBalanceInfo.put(sal.getServerName(), 333 new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue())); 334 } 335 int totalNumMoved = regionsToMove.size(); 336 337 // Walk down least loaded, filling each to the min 338 int neededRegions = 0; // number of regions needed to bring all up to min 339 fetchFromTail = false; 340 341 Map<ServerName, Integer> underloadedServers = new HashMap<>(); 342 int maxToTake = numRegions - min; 343 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server: 344 serversByLoad.entrySet()) { 345 if (maxToTake == 0) break; // no more to take 346 int load = server.getKey().getLoad(); 347 if (load >= min) { 348 continue; // look for other servers which haven't reached min 349 } 350 int regionsToPut = min - load; 351 maxToTake -= regionsToPut; 352 underloadedServers.put(server.getKey().getServerName(), regionsToPut); 353 } 354 // number of servers that get new regions 355 int serversUnderloaded = underloadedServers.size(); 356 int incr = 1; 357 List<ServerName> sns = 358 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); 359 Collections.shuffle(sns, RANDOM); 360 while (regionsToMove.size() > 0) { 361 int cnt = 0; 362 int i = incr > 0 ? 0 : underloadedServers.size()-1; 363 for (; i >= 0 && i < underloadedServers.size(); i += incr) { 364 if (regionsToMove.isEmpty()) break; 365 ServerName si = sns.get(i); 366 int numToTake = underloadedServers.get(si); 367 if (numToTake == 0) continue; 368 369 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); 370 371 underloadedServers.put(si, numToTake-1); 372 cnt++; 373 BalanceInfo bi = serverBalanceInfo.get(si); 374 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); 375 } 376 if (cnt == 0) break; 377 // iterates underloadedServers in the other direction 378 incr = -incr; 379 } 380 for (Integer i : underloadedServers.values()) { 381 // If we still want to take some, increment needed 382 neededRegions += i; 383 } 384 385 // Need to do a second pass. 386 // Either more regions to assign out or servers that are still underloaded 387 388 // If we need more to fill min, grab one from each most loaded until enough 389 if (neededRegions != 0) { 390 // Walk down most loaded, grabbing one from each until we get enough 391 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : 392 serversByLoad.descendingMap().entrySet()) { 393 BalanceInfo balanceInfo = 394 serverBalanceInfo.get(server.getKey().getServerName()); 395 int idx = 396 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); 397 if (idx >= server.getValue().size()) break; 398 RegionInfo region = server.getValue().get(idx); 399 if (region.isMetaRegion()) continue; // Don't move meta regions. 400 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); 401 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1); 402 balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1); 403 totalNumMoved++; 404 if (--neededRegions == 0) { 405 // No more regions needed, done shedding 406 break; 407 } 408 } 409 } 410 411 // Now we have a set of regions that must be all assigned out 412 // Assign each underloaded up to the min, then if leftovers, assign to max 413 414 // Walk down least loaded, assigning to each to fill up to min 415 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : 416 serversByLoad.entrySet()) { 417 int regionCount = server.getKey().getLoad(); 418 if (regionCount >= min) break; 419 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); 420 if(balanceInfo != null) { 421 regionCount += balanceInfo.getNumRegionsAdded(); 422 } 423 if(regionCount >= min) { 424 continue; 425 } 426 int numToTake = min - regionCount; 427 int numTaken = 0; 428 while(numTaken < numToTake && 0 < regionsToMove.size()) { 429 addRegionPlan(regionsToMove, fetchFromTail, 430 server.getKey().getServerName(), regionsToReturn); 431 numTaken++; 432 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1); 433 } 434 } 435 436 if (min != max) { 437 balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min); 438 } 439 440 long endTime = System.currentTimeMillis(); 441 442 if (!regionsToMove.isEmpty() || neededRegions != 0) { 443 // Emit data so can diagnose how balancer went astray. 444 LOG.warn("regionsToMove=" + totalNumMoved + 445 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + 446 ", serversUnderloaded=" + serversUnderloaded); 447 StringBuilder sb = new StringBuilder(); 448 for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) { 449 if (sb.length() > 0) sb.append(", "); 450 sb.append(e.getKey().toString()); 451 sb.append(" "); 452 sb.append(e.getValue().size()); 453 } 454 LOG.warn("Input " + sb.toString()); 455 } 456 457 // All done! 458 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + 459 "Moving " + totalNumMoved + " regions off of " + 460 serversOverloaded + " overloaded servers onto " + 461 serversUnderloaded + " less loaded servers"); 462 463 return regionsToReturn; 464 } 465 466 /** 467 * If we need to balanceoverall, we need to add one more round to peel off one region from each max. 468 * Together with other regions left to be assigned, we distribute all regionToMove, to the RS 469 * that have less regions in whole cluster scope. 470 */ 471 public void balanceOverall(List<RegionPlan> regionsToReturn, 472 Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail, 473 MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){ 474 // Step 1. 475 // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair, 476 // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2 477 Map<ServerName, List<Integer>> returnMap = new HashMap<>(); 478 for (int i = 0; i < regionsToReturn.size(); i++) { 479 List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination()); 480 if (pos == null) { 481 pos = new ArrayList<>(); 482 returnMap.put(regionsToReturn.get(i).getDestination(), pos); 483 } 484 pos.add(i); 485 } 486 487 // Step 2. 488 // Peel off one region from each RS which has max number of regions now. 489 // Each RS should have either max or min numbers of regions for this table. 490 for (int i = 0; i < serverLoadList.size(); i++) { 491 ServerAndLoad serverload = serverLoadList.get(i); 492 BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName()); 493 if (balanceInfo == null) { 494 continue; 495 } 496 setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded()); 497 if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) { 498 RegionInfo hriToPlan; 499 if (balanceInfo.getHriList().isEmpty()) { 500 LOG.debug("During balanceOverall, we found " + serverload.getServerName() 501 + " has no RegionInfo, no operation needed"); 502 continue; 503 } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) { 504 continue; 505 } else { 506 hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload()); 507 } 508 RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null); 509 regionsToMove.add(maxPlan); 510 setLoad(serverLoadList, i, -1); 511 }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max 512 || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){ 513 LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " + 514 "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() + 515 " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" + 516 max + ", min =" + min + ". Thus stop balance for this table"); // should not happen 517 return; 518 } 519 } 520 521 // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server. 522 // We only need to assign the regionsToMove to 523 // the first n = regionsToMove.size() RS that has least load. 524 Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){ 525 @Override 526 public int compare(ServerAndLoad s1, ServerAndLoad s2) { 527 if(s1.getLoad() == s2.getLoad()) return 0; 528 else return (s1.getLoad() > s2.getLoad())? 1 : -1; 529 }}); 530 531 // Step 4. 532 // Preparation before assign out all regionsToMove. 533 // We need to remove the plan that has the source RS equals to destination RS, 534 // since the source RS belongs to the least n loaded RS. 535 int assignLength = regionsToMove.size(); 536 // A structure help to map ServerName to it's load and index in ServerLoadList 537 Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>(); 538 for (int i = 0; i < serverLoadList.size(); i++) { 539 SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i)); 540 } 541 Pair<ServerAndLoad,Integer> shredLoad; 542 // A List to help mark the plan in regionsToMove that should be removed 543 List<RegionPlan> planToRemoveList = new ArrayList<>(); 544 // A structure to record how many times a server becomes the source of a plan, from regionsToMove. 545 Map<ServerName, Integer> sourceMap = new HashMap<>(); 546 // We remove one of the plan which would cause source RS equals destination RS. 547 // But we should keep in mind that the second plan from such RS should be kept. 548 for(RegionPlan plan: regionsToMove){ 549 // the source RS's load and index in ServerLoadList 550 shredLoad = SnLoadMap.get(plan.getSource()); 551 if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0); 552 sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1); 553 if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) { 554 planToRemoveList.add(plan); 555 // While marked as to be removed, the count should be add back to the source RS 556 setLoad(serverLoadList, shredLoad.getSecond(), 1); 557 } 558 } 559 // Remove those marked plans from regionsToMove, 560 // we cannot direct remove them during iterating through 561 // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue. 562 for(RegionPlan planToRemove : planToRemoveList){ 563 regionsToMove.remove(planToRemove); 564 } 565 566 // Step 5. 567 // We only need to assign the regionsToMove to 568 // the first n = regionsToMove.size() of them, with least load. 569 // With this strategy adopted, we can gradually achieve the overall balance, 570 // while keeping table level balanced. 571 for(int i = 0; i < assignLength; i++){ 572 // skip the RS that is also the source, we have removed them from regionsToMove in previous step 573 if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue; 574 addRegionPlan(regionsToMove, fetchFromTail, 575 serverLoadList.get(i).getServerName(), regionsToReturn); 576 setLoad(serverLoadList, i, 1); 577 // resolve a possible cyclic assignment pair if we just produced one: 578 // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2 579 List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource()); 580 if (pos != null && pos.size() != 0) { 581 regionsToReturn.get(pos.get(pos.size() - 1)).setDestination( 582 regionsToReturn.get(regionsToReturn.size() - 1).getDestination()); 583 pos.remove(pos.size() - 1); 584 regionsToReturn.remove(regionsToReturn.size() - 1); 585 } 586 } 587 // Done balance overall 588 } 589 590 /** 591 * Add a region from the head or tail to the List of regions to return. 592 */ 593 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove, 594 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) { 595 RegionPlan rp = null; 596 if (!fetchFromTail) rp = regionsToMove.remove(); 597 else rp = regionsToMove.removeLast(); 598 rp.setDestination(sn); 599 regionsToReturn.add(rp); 600 } 601 602 /** 603 * Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions 604 * on every server to achieve overall balanced 605 */ 606 @Override 607 public synchronized List<RegionPlan> 608 balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 609 setClusterLoad(loadOfAllTable); 610 return super.balanceCluster(loadOfAllTable); 611 } 612}