001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.master.RegionPlan; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 043 044/** 045 * Makes decisions about the placement and movement of Regions across RegionServers. 046 * <p/> 047 * Cluster-wide load balancing will occur only when there are no regions in transition and according 048 * to a fixed period of a time using {@link #balanceCluster(Map)}. 049 * <p/> 050 * On cluster startup, bulk assignment can be used to determine locations for all Regions in a 051 * cluster. 052 * <p/> 053 * This classes produces plans for the {@code AssignmentManager} to execute. 054 */ 055@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 056public class SimpleLoadBalancer extends BaseLoadBalancer { 057 private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class); 058 059 private RegionInfoComparator riComparator = new RegionInfoComparator(); 060 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); 061 private float avgLoadOverall; 062 private List<ServerAndLoad> serverLoadList = new ArrayList<>(); 063 // overallSlop to control simpleLoadBalancer's cluster level threshold 064 private float overallSlop; 065 066 /** 067 * Stores additional per-server information about the regions added/removed during the run of the 068 * balancing algorithm. 069 * </p> 070 * For servers that shed regions, we need to track which regions we have already shed. 071 * <b>nextRegionForUnload</b> contains the index in the list of regions on the server that is the 072 * next to be shed. 073 */ 074 private static final class BalanceInfo { 075 076 private int nextRegionForUnload; 077 private int numRegionsAdded; 078 private List<RegionInfo> hriList; 079 080 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) { 081 this.nextRegionForUnload = nextRegionForUnload; 082 this.numRegionsAdded = numRegionsAdded; 083 this.hriList = hriList; 084 } 085 086 int getNextRegionForUnload() { 087 return nextRegionForUnload; 088 } 089 090 int getNumRegionsAdded() { 091 return numRegionsAdded; 092 } 093 094 void setNumRegionsAdded(int numAdded) { 095 this.numRegionsAdded = numAdded; 096 } 097 098 List<RegionInfo> getHriList() { 099 return hriList; 100 } 101 102 void setNextRegionForUnload(int nextRegionForUnload) { 103 this.nextRegionForUnload = nextRegionForUnload; 104 } 105 106 } 107 108 /** 109 * Pass RegionStates and allow balancer to set the current cluster load. 110 */ 111 @RestrictedApi(explanation = "Should only be called in tests", link = "", 112 allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java") 113 void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 114 serverLoadList.clear(); 115 Map<ServerName, Integer> server2LoadMap = new HashMap<>(); 116 float sum = 0; 117 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad 118 .entrySet()) { 119 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) { 120 int regionNum = entry.getValue().size(); 121 server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v); 122 sum += regionNum; 123 } 124 } 125 server2LoadMap.forEach((k, v) -> { 126 serverLoadList.add(new ServerAndLoad(k, v)); 127 }); 128 avgLoadOverall = sum / serverLoadList.size(); 129 } 130 131 @Override 132 protected void 133 preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 134 // We need clusterLoad of all regions on every server to achieve overall balanced 135 setClusterLoad(loadOfAllTable); 136 } 137 138 @Override 139 protected void loadConf(Configuration conf) { 140 super.loadConf(conf); 141 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 142 } 143 144 @Override 145 public void onConfigurationChange(Configuration conf) { 146 float originSlop = slop; 147 float originOverallSlop = overallSlop; 148 loadConf(conf); 149 LOG.info( 150 "Update configuration of SimpleLoadBalancer, previous slop is {}," 151 + " current slop is {}, previous overallSlop is {}, current overallSlop is {}", 152 originSlop, slop, originOverallSlop, overallSlop); 153 } 154 155 private void setLoad(List<ServerAndLoad> slList, int i, int loadChange) { 156 ServerAndLoad newsl = 157 new ServerAndLoad(slList.get(i).getServerName(), slList.get(i).getLoad() + loadChange); 158 slList.set(i, newsl); 159 } 160 161 /** 162 * A checker function to decide when we want balance overall and certain table has been balanced, 163 * do we still need to re-distribute regions of this table to achieve the state of overall-balance 164 * @return true if this table should be balanced. 165 */ 166 private boolean overallNeedsBalance() { 167 int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop)); 168 int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop)); 169 int max = 0, min = Integer.MAX_VALUE; 170 for (ServerAndLoad server : serverLoadList) { 171 max = Math.max(server.getLoad(), max); 172 min = Math.min(server.getLoad(), min); 173 } 174 if (max <= ceiling && min >= floor) { 175 if (LOG.isTraceEnabled()) { 176 // If nothing to balance, then don't say anything unless trace-level logging. 177 LOG.trace("Skipping load balancing because cluster is balanced at overall level"); 178 } 179 return false; 180 } 181 return true; 182 } 183 184 private boolean needsBalance(BalancerClusterState c) { 185 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 186 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 187 if (LOG.isDebugEnabled()) { 188 LOG.debug( 189 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 190 } 191 return false; 192 } 193 if (idleRegionServerExist(c)) { 194 return true; 195 } 196 // Check if we even need to do any load balancing 197 // HBASE-3681 check sloppiness first 198 return sloppyRegionServerExist(cs); 199 } 200 201 /** 202 * Generate a global load balancing plan according to the specified map of server information to 203 * the most loaded regions of each server. The load balancing invariant is that all servers are 204 * within 1 region of the average number of regions per server. If the average is an integer 205 * number, all servers will be balanced to the average. Otherwise, all servers will have either 206 * floor(average) or ceiling(average) regions. HBASE-3609 Modeled regionsToMove using Guava's 207 * MinMaxPriorityQueue so that we can fetch from both ends of the queue. At the beginning, we 208 * check whether there was empty region server just discovered by Master. If so, we alternately 209 * choose new / old regions from head / tail of regionsToMove, respectively. This alternation 210 * avoids clustering young regions on the newly discovered region server. Otherwise, we choose new 211 * regions from head of regionsToMove. Another improvement from HBASE-3609 is that we assign 212 * regions from regionsToMove to underloaded servers in round-robin fashion. Previously one 213 * underloaded server would be filled before we move onto the next underloaded server, leading to 214 * clustering of young regions. Finally, we randomly shuffle underloaded servers so that they 215 * receive offloaded regions relatively evenly across calls to balanceCluster(). The algorithm is 216 * currently implemented as such: 217 * <ol> 218 * <li>Determine the two valid numbers of regions each server should have, 219 * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average). 220 * <li>Iterate down the most loaded servers, shedding regions from each so each server hosts 221 * exactly <b>MAX</b> regions. Stop once you reach a server that already has <= <b>MAX</b> 222 * regions. 223 * <p> 224 * Order the regions to move from most recent to least. 225 * <li>Iterate down the least loaded servers, assigning regions so each server has exactly 226 * <b>MIN</b> regions. Stop once you reach a server that already has >= <b>MIN</b> regions. 227 * Regions being assigned to underloaded servers are those that were shed in the previous step. It 228 * is possible that there were not enough regions shed to fill each underloaded server to 229 * <b>MIN</b>. If so we end up with a number of regions required to do so, <b>neededRegions</b>. 230 * It is also possible that we were able to fill each underloaded but ended up with regions that 231 * were unassigned from overloaded servers but that still do not have assignment. If neither of 232 * these conditions hold (no regions needed to fill the underloaded servers, no regions leftover 233 * from overloaded servers), we are done and return. Otherwise we handle these cases below. 234 * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers), we iterate the most 235 * loaded servers again, shedding a single server from each (this brings them from having 236 * <b>MAX</b> regions to having <b>MIN</b> regions). 237 * <li>We now definitely have more regions that need assignment, either from the previous step or 238 * from the original shedding from overloaded servers. Iterate the least loaded servers filling 239 * each to <b>MIN</b>. 240 * <li>If we still have more regions that need assignment, again iterate the least loaded servers, 241 * this time giving each one (filling them to <b>MAX</b>) until we run out. 242 * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions. In addition, any server 243 * hosting >= <b>MAX</b> regions is guaranteed to end up with <b>MAX</b> regions at the end of 244 * the balancing. This ensures the minimal number of regions possible are moved. 245 * </ol> 246 * TODO: We can at-most reassign the number of regions away from a particular server to be how 247 * many they report as most loaded. Should we just keep all assignment in memory? Any objections? 248 * Does this mean we need HeapSize on HMaster? Or just careful monitor? (current thinking is we 249 * will hold all assignments in memory) 250 * @param loadOfOneTable Map of regionservers and their load/region information to a list of their 251 * most loaded regions 252 * @return a list of regions to be moved, including source and destination, or null if cluster is 253 * already balanced 254 */ 255 @Override 256 protected List<RegionPlan> balanceTable(TableName tableName, 257 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 258 long startTime = EnvironmentEdgeManager.currentTime(); 259 260 // construct a Cluster object with clusterMap and rest of the 261 // argument as defaults 262 BalancerClusterState c = 263 new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager); 264 if (!needsBalance(c) && !this.overallNeedsBalance()) { 265 return null; 266 } 267 ClusterLoadState cs = new ClusterLoadState(loadOfOneTable); 268 int numServers = cs.getNumServers(); 269 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 270 int numRegions = cs.getNumRegions(); 271 float average = cs.getLoadAverage(); 272 int max = (int) Math.ceil(average); 273 int min = (int) average; 274 275 // Using to check balance result. 276 StringBuilder strBalanceParam = new StringBuilder(); 277 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) 278 .append(", numServers=").append(numServers).append(", max=").append(max).append(", min=") 279 .append(min); 280 LOG.debug(strBalanceParam.toString()); 281 282 // Balance the cluster 283 // TODO: Look at data block locality or a more complex load to do this 284 MinMaxPriorityQueue<RegionPlan> regionsToMove = 285 MinMaxPriorityQueue.orderedBy(rpComparator).create(); 286 List<RegionPlan> regionsToReturn = new ArrayList<>(); 287 288 // Walk down most loaded, pruning each to the max 289 int serversOverloaded = 0; 290 // flag used to fetch regions from head and tail of list, alternately 291 boolean fetchFromTail = false; 292 Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>(); 293 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap() 294 .entrySet()) { 295 ServerAndLoad sal = server.getKey(); 296 int load = sal.getLoad(); 297 if (load <= max) { 298 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue())); 299 continue; 300 } 301 serversOverloaded++; 302 List<RegionInfo> regions = server.getValue(); 303 int numToOffload = Math.min(load - max, regions.size()); 304 // account for the out-of-band regions which were assigned to this server 305 // after some other region server crashed 306 Collections.sort(regions, riComparator); 307 int numTaken = 0; 308 for (int i = 0; i <= numToOffload;) { 309 RegionInfo hri = regions.get(i); // fetch from head 310 if (fetchFromTail) { 311 hri = regions.get(regions.size() - 1 - i); 312 } 313 i++; 314 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); 315 numTaken++; 316 if (numTaken >= numToOffload) { 317 break; 318 } 319 } 320 serverBalanceInfo.put(sal.getServerName(), 321 new BalanceInfo(numToOffload, -numTaken, server.getValue())); 322 } 323 int totalNumMoved = regionsToMove.size(); 324 325 // Walk down least loaded, filling each to the min 326 int neededRegions = 0; // number of regions needed to bring all up to min 327 fetchFromTail = false; 328 329 Map<ServerName, Integer> underloadedServers = new HashMap<>(); 330 int maxToTake = numRegions - min; 331 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) { 332 if (maxToTake == 0) { 333 break; // no more to take 334 } 335 int load = server.getKey().getLoad(); 336 if (load >= min) { 337 continue; // look for other servers which haven't reached min 338 } 339 int regionsToPut = min - load; 340 maxToTake -= regionsToPut; 341 underloadedServers.put(server.getKey().getServerName(), regionsToPut); 342 } 343 // number of servers that get new regions 344 int serversUnderloaded = underloadedServers.size(); 345 int incr = 1; 346 List<ServerName> sns = 347 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); 348 Collections.shuffle(sns); 349 while (regionsToMove.size() > 0) { 350 int cnt = 0; 351 int i = incr > 0 ? 0 : underloadedServers.size() - 1; 352 for (; i >= 0 && i < underloadedServers.size(); i += incr) { 353 if (regionsToMove.isEmpty()) { 354 break; 355 } 356 ServerName si = sns.get(i); 357 int numToTake = underloadedServers.get(si); 358 if (numToTake == 0) { 359 continue; 360 } 361 362 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); 363 364 underloadedServers.put(si, numToTake - 1); 365 cnt++; 366 BalanceInfo bi = serverBalanceInfo.get(si); 367 bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1); 368 } 369 if (cnt == 0) { 370 break; 371 } 372 // iterates underloadedServers in the other direction 373 incr = -incr; 374 } 375 for (Integer i : underloadedServers.values()) { 376 // If we still want to take some, increment needed 377 neededRegions += i; 378 } 379 380 // Need to do a second pass. 381 // Either more regions to assign out or servers that are still underloaded 382 383 // If we need more to fill min, grab one from each most loaded until enough 384 if (neededRegions != 0) { 385 // Walk down most loaded, grabbing one from each until we get enough 386 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap() 387 .entrySet()) { 388 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); 389 int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); 390 if (idx >= server.getValue().size()) { 391 break; 392 } 393 RegionInfo region = server.getValue().get(idx); 394 if (region.isMetaRegion()) { 395 continue; // Don't move meta regions. 396 } 397 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); 398 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1); 399 balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1); 400 totalNumMoved++; 401 if (--neededRegions == 0) { 402 // No more regions needed, done shedding 403 break; 404 } 405 } 406 } 407 408 // Now we have a set of regions that must be all assigned out 409 // Assign each underloaded up to the min, then if leftovers, assign to max 410 411 // Walk down least loaded, assigning to each to fill up to min 412 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) { 413 int regionCount = server.getKey().getLoad(); 414 if (regionCount >= min) { 415 break; 416 } 417 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); 418 if (balanceInfo != null) { 419 regionCount += balanceInfo.getNumRegionsAdded(); 420 } 421 if (regionCount >= min) { 422 continue; 423 } 424 int numToTake = min - regionCount; 425 int numTaken = 0; 426 while (numTaken < numToTake && 0 < regionsToMove.size()) { 427 addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(), 428 regionsToReturn); 429 numTaken++; 430 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1); 431 } 432 } 433 434 if (min != max) { 435 balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min); 436 } 437 438 long endTime = EnvironmentEdgeManager.currentTime(); 439 440 if (!regionsToMove.isEmpty() || neededRegions != 0) { 441 // Emit data so can diagnose how balancer went astray. 442 LOG.warn( 443 "regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded=" 444 + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded); 445 StringBuilder sb = new StringBuilder(); 446 for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) { 447 if (sb.length() > 0) { 448 sb.append(", "); 449 } 450 sb.append(e.getKey().toString()); 451 sb.append(" "); 452 sb.append(e.getValue().size()); 453 } 454 LOG.warn("Input " + sb.toString()); 455 } 456 457 // All done! 458 LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving " 459 + totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto " 460 + serversUnderloaded + " less loaded servers"); 461 462 return regionsToReturn; 463 } 464 465 /** 466 * If we need to balanceoverall, we need to add one more round to peel off one region from each 467 * max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS 468 * that have less regions in whole cluster scope. 469 */ 470 private void balanceOverall(List<RegionPlan> regionsToReturn, 471 Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail, 472 MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) { 473 // Step 1. 474 // A map to record the plan we have already got as status quo, in order to resolve a cyclic 475 // assignment pair, 476 // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2 477 Map<ServerName, List<Integer>> returnMap = new HashMap<>(); 478 for (int i = 0; i < regionsToReturn.size(); i++) { 479 List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination()); 480 if (pos == null) { 481 pos = new ArrayList<>(); 482 returnMap.put(regionsToReturn.get(i).getDestination(), pos); 483 } 484 pos.add(i); 485 } 486 487 // Step 2. 488 // Peel off one region from each RS which has max number of regions now. 489 // Each RS should have either max or min numbers of regions for this table. 490 for (int i = 0; i < serverLoadList.size(); i++) { 491 ServerAndLoad serverload = serverLoadList.get(i); 492 BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName()); 493 if (balanceInfo == null) { 494 continue; 495 } 496 setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded()); 497 if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) { 498 RegionInfo hriToPlan; 499 if (balanceInfo.getHriList().isEmpty()) { 500 LOG.debug("During balanceOverall, we found " + serverload.getServerName() 501 + " has no RegionInfo, no operation needed"); 502 continue; 503 } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) { 504 continue; 505 } else { 506 hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload()); 507 } 508 RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null); 509 regionsToMove.add(maxPlan); 510 setLoad(serverLoadList, i, -1); 511 } else if ( 512 balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max 513 || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min 514 ) { 515 LOG.warn( 516 "Encounter incorrect region numbers after calculating move plan during balanceOverall, " 517 + "for this table, " + serverload.getServerName() + " originally has " 518 + balanceInfo.getHriList().size() + " regions and " + balanceInfo.getNumRegionsAdded() 519 + " regions have been added. Yet, max =" + max + ", min =" + min 520 + ". Thus stop balance for this table"); // should not happen 521 return; 522 } 523 } 524 525 // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server. 526 // We only need to assign the regionsToMove to 527 // the first n = regionsToMove.size() RS that has least load. 528 Collections.sort(serverLoadList, new Comparator<ServerAndLoad>() { 529 @Override 530 public int compare(ServerAndLoad s1, ServerAndLoad s2) { 531 if (s1.getLoad() == s2.getLoad()) { 532 return 0; 533 } else { 534 return (s1.getLoad() > s2.getLoad()) ? 1 : -1; 535 } 536 } 537 }); 538 539 // Step 4. 540 // Preparation before assign out all regionsToMove. 541 // We need to remove the plan that has the source RS equals to destination RS, 542 // since the source RS belongs to the least n loaded RS. 543 int assignLength = regionsToMove.size(); 544 // A structure help to map ServerName to it's load and index in ServerLoadList 545 Map<ServerName, Pair<ServerAndLoad, Integer>> SnLoadMap = new HashMap<>(); 546 for (int i = 0; i < serverLoadList.size(); i++) { 547 SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i)); 548 } 549 Pair<ServerAndLoad, Integer> shredLoad; 550 // A List to help mark the plan in regionsToMove that should be removed 551 List<RegionPlan> planToRemoveList = new ArrayList<>(); 552 // A structure to record how many times a server becomes the source of a plan, from 553 // regionsToMove. 554 Map<ServerName, Integer> sourceMap = new HashMap<>(); 555 // We remove one of the plan which would cause source RS equals destination RS. 556 // But we should keep in mind that the second plan from such RS should be kept. 557 for (RegionPlan plan : regionsToMove) { 558 // the source RS's load and index in ServerLoadList 559 shredLoad = SnLoadMap.get(plan.getSource()); 560 if (!sourceMap.containsKey(plan.getSource())) { 561 sourceMap.put(plan.getSource(), 0); 562 } 563 sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1); 564 if (shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) { 565 planToRemoveList.add(plan); 566 // While marked as to be removed, the count should be add back to the source RS 567 setLoad(serverLoadList, shredLoad.getSecond(), 1); 568 } 569 } 570 // Remove those marked plans from regionsToMove, 571 // we cannot direct remove them during iterating through 572 // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue. 573 for (RegionPlan planToRemove : planToRemoveList) { 574 regionsToMove.remove(planToRemove); 575 } 576 577 // Step 5. 578 // We only need to assign the regionsToMove to 579 // the first n = regionsToMove.size() of them, with least load. 580 // With this strategy adopted, we can gradually achieve the overall balance, 581 // while keeping table level balanced. 582 for (int i = 0; i < assignLength; i++) { 583 // skip the RS that is also the source, we have removed them from regionsToMove in previous 584 // step 585 if (sourceMap.containsKey(serverLoadList.get(i).getServerName())) { 586 continue; 587 } 588 addRegionPlan(regionsToMove, fetchFromTail, serverLoadList.get(i).getServerName(), 589 regionsToReturn); 590 setLoad(serverLoadList, i, 1); 591 // resolve a possible cyclic assignment pair if we just produced one: 592 // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2 593 List<Integer> pos = 594 returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource()); 595 if (pos != null && pos.size() != 0) { 596 regionsToReturn.get(pos.get(pos.size() - 1)) 597 .setDestination(regionsToReturn.get(regionsToReturn.size() - 1).getDestination()); 598 pos.remove(pos.size() - 1); 599 regionsToReturn.remove(regionsToReturn.size() - 1); 600 } 601 } 602 // Done balance overall 603 } 604 605 /** 606 * Add a region from the head or tail to the List of regions to return. 607 */ 608 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove, 609 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) { 610 RegionPlan rp = null; 611 if (!fetchFromTail) { 612 rp = regionsToMove.remove(); 613 } else { 614 rp = regionsToMove.removeLast(); 615 } 616 rp.setDestination(sn); 617 regionsToReturn.add(rp); 618 } 619}