001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.master.RegionPlan; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 043 044/** 045 * Makes decisions about the placement and movement of Regions across RegionServers. 046 * <p/> 047 * Cluster-wide load balancing will occur only when there are no regions in transition and according 048 * to a fixed period of a time using {@link #balanceCluster(Map)}. 049 * <p/> 050 * On cluster startup, bulk assignment can be used to determine locations for all Regions in a 051 * cluster. 052 * <p/> 053 * This classes produces plans for the 054 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public class SimpleLoadBalancer extends BaseLoadBalancer { 058 private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class); 059 060 private RegionInfoComparator riComparator = new RegionInfoComparator(); 061 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); 062 private float avgLoadOverall; 063 private List<ServerAndLoad> serverLoadList = new ArrayList<>(); 064 // overallSlop to control simpleLoadBalancer's cluster level threshold 065 private float overallSlop; 066 067 /** 068 * Stores additional per-server information about the regions added/removed during the run of the 069 * balancing algorithm. 070 * </p> 071 * For servers that shed regions, we need to track which regions we have already shed. 072 * <b>nextRegionForUnload</b> contains the index in the list of regions on the server that is the 073 * next to be shed. 074 */ 075 private static final class BalanceInfo { 076 077 private int nextRegionForUnload; 078 private int numRegionsAdded; 079 private List<RegionInfo> hriList; 080 081 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) { 082 this.nextRegionForUnload = nextRegionForUnload; 083 this.numRegionsAdded = numRegionsAdded; 084 this.hriList = hriList; 085 } 086 087 int getNextRegionForUnload() { 088 return nextRegionForUnload; 089 } 090 091 int getNumRegionsAdded() { 092 return numRegionsAdded; 093 } 094 095 void setNumRegionsAdded(int numAdded) { 096 this.numRegionsAdded = numAdded; 097 } 098 099 List<RegionInfo> getHriList() { 100 return hriList; 101 } 102 103 void setNextRegionForUnload(int nextRegionForUnload) { 104 this.nextRegionForUnload = nextRegionForUnload; 105 } 106 107 } 108 109 /** 110 * Pass RegionStates and allow balancer to set the current cluster load. 111 */ 112 @RestrictedApi(explanation = "Should only be called in tests", link = "", 113 allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java") 114 void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 115 serverLoadList.clear(); 116 Map<ServerName, Integer> server2LoadMap = new HashMap<>(); 117 float sum = 0; 118 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad 119 .entrySet()) { 120 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) { 121 if (entry.getKey().equals(masterServerName)) { 122 continue; // we shouldn't include master as potential assignee 123 } 124 int regionNum = entry.getValue().size(); 125 server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v); 126 sum += regionNum; 127 } 128 } 129 server2LoadMap.forEach((k, v) -> { 130 serverLoadList.add(new ServerAndLoad(k, v)); 131 }); 132 avgLoadOverall = sum / serverLoadList.size(); 133 } 134 135 @Override 136 protected void 137 preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 138 // We need clusterLoad of all regions on every server to achieve overall balanced 139 setClusterLoad(loadOfAllTable); 140 } 141 142 @Override 143 protected void loadConf(Configuration conf) { 144 super.loadConf(conf); 145 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 146 } 147 148 @Override 149 public void onConfigurationChange(Configuration conf) { 150 float originSlop = slop; 151 float originOverallSlop = overallSlop; 152 loadConf(conf); 153 LOG.info( 154 "Update configuration of SimpleLoadBalancer, previous slop is {}," 155 + " current slop is {}, previous overallSlop is {}, current overallSlop is {}", 156 originSlop, slop, originOverallSlop, overallSlop); 157 } 158 159 private void setLoad(List<ServerAndLoad> slList, int i, int loadChange) { 160 ServerAndLoad newsl = 161 new ServerAndLoad(slList.get(i).getServerName(), slList.get(i).getLoad() + loadChange); 162 slList.set(i, newsl); 163 } 164 165 /** 166 * A checker function to decide when we want balance overall and certain table has been balanced, 167 * do we still need to re-distribute regions of this table to achieve the state of overall-balance 168 * @return true if this table should be balanced. 169 */ 170 private boolean overallNeedsBalance() { 171 int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop)); 172 int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop)); 173 int max = 0, min = Integer.MAX_VALUE; 174 for (ServerAndLoad server : serverLoadList) { 175 max = Math.max(server.getLoad(), max); 176 min = Math.min(server.getLoad(), min); 177 } 178 if (max <= ceiling && min >= floor) { 179 if (LOG.isTraceEnabled()) { 180 // If nothing to balance, then don't say anything unless trace-level logging. 181 LOG.trace("Skipping load balancing because cluster is balanced at overall level"); 182 } 183 return false; 184 } 185 return true; 186 } 187 188 private boolean needsBalance(BalancerClusterState c) { 189 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 190 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 191 if (LOG.isDebugEnabled()) { 192 LOG.debug( 193 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 194 } 195 return false; 196 } 197 if (idleRegionServerExist(c)) { 198 return true; 199 } 200 // Check if we even need to do any load balancing 201 // HBASE-3681 check sloppiness first 202 return sloppyRegionServerExist(cs); 203 } 204 205 /** 206 * Generate a global load balancing plan according to the specified map of server information to 207 * the most loaded regions of each server. The load balancing invariant is that all servers are 208 * within 1 region of the average number of regions per server. If the average is an integer 209 * number, all servers will be balanced to the average. Otherwise, all servers will have either 210 * floor(average) or ceiling(average) regions. HBASE-3609 Modeled regionsToMove using Guava's 211 * MinMaxPriorityQueue so that we can fetch from both ends of the queue. At the beginning, we 212 * check whether there was empty region server just discovered by Master. If so, we alternately 213 * choose new / old regions from head / tail of regionsToMove, respectively. This alternation 214 * avoids clustering young regions on the newly discovered region server. Otherwise, we choose new 215 * regions from head of regionsToMove. Another improvement from HBASE-3609 is that we assign 216 * regions from regionsToMove to underloaded servers in round-robin fashion. Previously one 217 * underloaded server would be filled before we move onto the next underloaded server, leading to 218 * clustering of young regions. Finally, we randomly shuffle underloaded servers so that they 219 * receive offloaded regions relatively evenly across calls to balanceCluster(). The algorithm is 220 * currently implemented as such: 221 * <ol> 222 * <li>Determine the two valid numbers of regions each server should have, 223 * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average). 224 * <li>Iterate down the most loaded servers, shedding regions from each so each server hosts 225 * exactly <b>MAX</b> regions. Stop once you reach a server that already has <= <b>MAX</b> 226 * regions. 227 * <p> 228 * Order the regions to move from most recent to least. 229 * <li>Iterate down the least loaded servers, assigning regions so each server has exactly 230 * <b>MIN</b> regions. Stop once you reach a server that already has >= <b>MIN</b> regions. 231 * Regions being assigned to underloaded servers are those that were shed in the previous step. It 232 * is possible that there were not enough regions shed to fill each underloaded server to 233 * <b>MIN</b>. If so we end up with a number of regions required to do so, <b>neededRegions</b>. 234 * It is also possible that we were able to fill each underloaded but ended up with regions that 235 * were unassigned from overloaded servers but that still do not have assignment. If neither of 236 * these conditions hold (no regions needed to fill the underloaded servers, no regions leftover 237 * from overloaded servers), we are done and return. Otherwise we handle these cases below. 238 * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers), we iterate the most 239 * loaded servers again, shedding a single server from each (this brings them from having 240 * <b>MAX</b> regions to having <b>MIN</b> regions). 241 * <li>We now definitely have more regions that need assignment, either from the previous step or 242 * from the original shedding from overloaded servers. Iterate the least loaded servers filling 243 * each to <b>MIN</b>. 244 * <li>If we still have more regions that need assignment, again iterate the least loaded servers, 245 * this time giving each one (filling them to <b>MAX</b>) until we run out. 246 * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions. In addition, any server 247 * hosting >= <b>MAX</b> regions is guaranteed to end up with <b>MAX</b> regions at the end of 248 * the balancing. This ensures the minimal number of regions possible are moved. 249 * </ol> 250 * TODO: We can at-most reassign the number of regions away from a particular server to be how 251 * many they report as most loaded. Should we just keep all assignment in memory? Any objections? 252 * Does this mean we need HeapSize on HMaster? Or just careful monitor? (current thinking is we 253 * will hold all assignments in memory) 254 * @param loadOfOneTable Map of regionservers and their load/region information to a list of their 255 * most loaded regions 256 * @return a list of regions to be moved, including source and destination, or null if cluster is 257 * already balanced 258 */ 259 @Override 260 protected List<RegionPlan> balanceTable(TableName tableName, 261 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 262 List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable); 263 if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 264 return regionsToReturn; 265 } 266 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 267 if (loadOfOneTable.size() <= 2) { 268 return null; 269 } 270 loadOfOneTable = new HashMap<>(loadOfOneTable); 271 loadOfOneTable.remove(masterServerName); 272 } 273 274 long startTime = EnvironmentEdgeManager.currentTime(); 275 276 // construct a Cluster object with clusterMap and rest of the 277 // argument as defaults 278 BalancerClusterState c = 279 new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager); 280 if (!needsBalance(c) && !this.overallNeedsBalance()) { 281 return null; 282 } 283 ClusterLoadState cs = new ClusterLoadState(loadOfOneTable); 284 int numServers = cs.getNumServers(); 285 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 286 int numRegions = cs.getNumRegions(); 287 float average = cs.getLoadAverage(); 288 int max = (int) Math.ceil(average); 289 int min = (int) average; 290 291 // Using to check balance result. 292 StringBuilder strBalanceParam = new StringBuilder(); 293 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) 294 .append(", numServers=").append(numServers).append(", max=").append(max).append(", min=") 295 .append(min); 296 LOG.debug(strBalanceParam.toString()); 297 298 // Balance the cluster 299 // TODO: Look at data block locality or a more complex load to do this 300 MinMaxPriorityQueue<RegionPlan> regionsToMove = 301 MinMaxPriorityQueue.orderedBy(rpComparator).create(); 302 regionsToReturn = new ArrayList<>(); 303 304 // Walk down most loaded, pruning each to the max 305 int serversOverloaded = 0; 306 // flag used to fetch regions from head and tail of list, alternately 307 boolean fetchFromTail = false; 308 Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>(); 309 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap() 310 .entrySet()) { 311 ServerAndLoad sal = server.getKey(); 312 int load = sal.getLoad(); 313 if (load <= max) { 314 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue())); 315 continue; 316 } 317 serversOverloaded++; 318 List<RegionInfo> regions = server.getValue(); 319 int numToOffload = Math.min(load - max, regions.size()); 320 // account for the out-of-band regions which were assigned to this server 321 // after some other region server crashed 322 Collections.sort(regions, riComparator); 323 int numTaken = 0; 324 for (int i = 0; i <= numToOffload;) { 325 RegionInfo hri = regions.get(i); // fetch from head 326 if (fetchFromTail) { 327 hri = regions.get(regions.size() - 1 - i); 328 } 329 i++; 330 // Don't rebalance special regions. 331 if (shouldBeOnMaster(hri) && masterServerName.equals(sal.getServerName())) continue; 332 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); 333 numTaken++; 334 if (numTaken >= numToOffload) { 335 break; 336 } 337 } 338 serverBalanceInfo.put(sal.getServerName(), 339 new BalanceInfo(numToOffload, (-1) * numTaken, server.getValue())); 340 } 341 int totalNumMoved = regionsToMove.size(); 342 343 // Walk down least loaded, filling each to the min 344 int neededRegions = 0; // number of regions needed to bring all up to min 345 fetchFromTail = false; 346 347 Map<ServerName, Integer> underloadedServers = new HashMap<>(); 348 int maxToTake = numRegions - min; 349 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) { 350 if (maxToTake == 0) { 351 break; // no more to take 352 } 353 int load = server.getKey().getLoad(); 354 if (load >= min) { 355 continue; // look for other servers which haven't reached min 356 } 357 int regionsToPut = min - load; 358 maxToTake -= regionsToPut; 359 underloadedServers.put(server.getKey().getServerName(), regionsToPut); 360 } 361 // number of servers that get new regions 362 int serversUnderloaded = underloadedServers.size(); 363 int incr = 1; 364 List<ServerName> sns = 365 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); 366 Collections.shuffle(sns); 367 while (regionsToMove.size() > 0) { 368 int cnt = 0; 369 int i = incr > 0 ? 0 : underloadedServers.size() - 1; 370 for (; i >= 0 && i < underloadedServers.size(); i += incr) { 371 if (regionsToMove.isEmpty()) { 372 break; 373 } 374 ServerName si = sns.get(i); 375 int numToTake = underloadedServers.get(si); 376 if (numToTake == 0) { 377 continue; 378 } 379 380 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); 381 382 underloadedServers.put(si, numToTake - 1); 383 cnt++; 384 BalanceInfo bi = serverBalanceInfo.get(si); 385 bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1); 386 } 387 if (cnt == 0) { 388 break; 389 } 390 // iterates underloadedServers in the other direction 391 incr = -incr; 392 } 393 for (Integer i : underloadedServers.values()) { 394 // If we still want to take some, increment needed 395 neededRegions += i; 396 } 397 398 // Need to do a second pass. 399 // Either more regions to assign out or servers that are still underloaded 400 401 // If we need more to fill min, grab one from each most loaded until enough 402 if (neededRegions != 0) { 403 // Walk down most loaded, grabbing one from each until we get enough 404 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap() 405 .entrySet()) { 406 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); 407 int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); 408 if (idx >= server.getValue().size()) { 409 break; 410 } 411 RegionInfo region = server.getValue().get(idx); 412 if (region.isMetaRegion()) { 413 continue; // Don't move meta regions. 414 } 415 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); 416 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1); 417 balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1); 418 totalNumMoved++; 419 if (--neededRegions == 0) { 420 // No more regions needed, done shedding 421 break; 422 } 423 } 424 } 425 426 // Now we have a set of regions that must be all assigned out 427 // Assign each underloaded up to the min, then if leftovers, assign to max 428 429 // Walk down least loaded, assigning to each to fill up to min 430 for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) { 431 int regionCount = server.getKey().getLoad(); 432 if (regionCount >= min) { 433 break; 434 } 435 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); 436 if (balanceInfo != null) { 437 regionCount += balanceInfo.getNumRegionsAdded(); 438 } 439 if (regionCount >= min) { 440 continue; 441 } 442 int numToTake = min - regionCount; 443 int numTaken = 0; 444 while (numTaken < numToTake && 0 < regionsToMove.size()) { 445 addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(), 446 regionsToReturn); 447 numTaken++; 448 balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1); 449 } 450 } 451 452 if (min != max) { 453 balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min); 454 } 455 456 long endTime = EnvironmentEdgeManager.currentTime(); 457 458 if (!regionsToMove.isEmpty() || neededRegions != 0) { 459 // Emit data so can diagnose how balancer went astray. 460 LOG.warn( 461 "regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded=" 462 + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded); 463 StringBuilder sb = new StringBuilder(); 464 for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) { 465 if (sb.length() > 0) { 466 sb.append(", "); 467 } 468 sb.append(e.getKey().toString()); 469 sb.append(" "); 470 sb.append(e.getValue().size()); 471 } 472 LOG.warn("Input " + sb.toString()); 473 } 474 475 // All done! 476 LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving " 477 + totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto " 478 + serversUnderloaded + " less loaded servers"); 479 480 return regionsToReturn; 481 } 482 483 /** 484 * If we need to balanceoverall, we need to add one more round to peel off one region from each 485 * max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS 486 * that have less regions in whole cluster scope. 487 */ 488 private void balanceOverall(List<RegionPlan> regionsToReturn, 489 Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail, 490 MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) { 491 // Step 1. 492 // A map to record the plan we have already got as status quo, in order to resolve a cyclic 493 // assignment pair, 494 // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2 495 Map<ServerName, List<Integer>> returnMap = new HashMap<>(); 496 for (int i = 0; i < regionsToReturn.size(); i++) { 497 List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination()); 498 if (pos == null) { 499 pos = new ArrayList<>(); 500 returnMap.put(regionsToReturn.get(i).getDestination(), pos); 501 } 502 pos.add(i); 503 } 504 505 // Step 2. 506 // Peel off one region from each RS which has max number of regions now. 507 // Each RS should have either max or min numbers of regions for this table. 508 for (int i = 0; i < serverLoadList.size(); i++) { 509 ServerAndLoad serverload = serverLoadList.get(i); 510 BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName()); 511 if (balanceInfo == null) { 512 continue; 513 } 514 setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded()); 515 if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) { 516 RegionInfo hriToPlan; 517 if (balanceInfo.getHriList().isEmpty()) { 518 LOG.debug("During balanceOverall, we found " + serverload.getServerName() 519 + " has no RegionInfo, no operation needed"); 520 continue; 521 } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) { 522 continue; 523 } else { 524 hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload()); 525 } 526 RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null); 527 regionsToMove.add(maxPlan); 528 setLoad(serverLoadList, i, -1); 529 } else if ( 530 balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max 531 || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min 532 ) { 533 LOG.warn( 534 "Encounter incorrect region numbers after calculating move plan during balanceOverall, " 535 + "for this table, " + serverload.getServerName() + " originally has " 536 + balanceInfo.getHriList().size() + " regions and " + balanceInfo.getNumRegionsAdded() 537 + " regions have been added. Yet, max =" + max + ", min =" + min 538 + ". Thus stop balance for this table"); // should not happen 539 return; 540 } 541 } 542 543 // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server. 544 // We only need to assign the regionsToMove to 545 // the first n = regionsToMove.size() RS that has least load. 546 Collections.sort(serverLoadList, new Comparator<ServerAndLoad>() { 547 @Override 548 public int compare(ServerAndLoad s1, ServerAndLoad s2) { 549 if (s1.getLoad() == s2.getLoad()) { 550 return 0; 551 } else { 552 return (s1.getLoad() > s2.getLoad()) ? 1 : -1; 553 } 554 } 555 }); 556 557 // Step 4. 558 // Preparation before assign out all regionsToMove. 559 // We need to remove the plan that has the source RS equals to destination RS, 560 // since the source RS belongs to the least n loaded RS. 561 int assignLength = regionsToMove.size(); 562 // A structure help to map ServerName to it's load and index in ServerLoadList 563 Map<ServerName, Pair<ServerAndLoad, Integer>> SnLoadMap = new HashMap<>(); 564 for (int i = 0; i < serverLoadList.size(); i++) { 565 SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i)); 566 } 567 Pair<ServerAndLoad, Integer> shredLoad; 568 // A List to help mark the plan in regionsToMove that should be removed 569 List<RegionPlan> planToRemoveList = new ArrayList<>(); 570 // A structure to record how many times a server becomes the source of a plan, from 571 // regionsToMove. 572 Map<ServerName, Integer> sourceMap = new HashMap<>(); 573 // We remove one of the plan which would cause source RS equals destination RS. 574 // But we should keep in mind that the second plan from such RS should be kept. 575 for (RegionPlan plan : regionsToMove) { 576 // the source RS's load and index in ServerLoadList 577 shredLoad = SnLoadMap.get(plan.getSource()); 578 if (!sourceMap.containsKey(plan.getSource())) { 579 sourceMap.put(plan.getSource(), 0); 580 } 581 sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1); 582 if (shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) { 583 planToRemoveList.add(plan); 584 // While marked as to be removed, the count should be add back to the source RS 585 setLoad(serverLoadList, shredLoad.getSecond(), 1); 586 } 587 } 588 // Remove those marked plans from regionsToMove, 589 // we cannot direct remove them during iterating through 590 // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue. 591 for (RegionPlan planToRemove : planToRemoveList) { 592 regionsToMove.remove(planToRemove); 593 } 594 595 // Step 5. 596 // We only need to assign the regionsToMove to 597 // the first n = regionsToMove.size() of them, with least load. 598 // With this strategy adopted, we can gradually achieve the overall balance, 599 // while keeping table level balanced. 600 for (int i = 0; i < assignLength; i++) { 601 // skip the RS that is also the source, we have removed them from regionsToMove in previous 602 // step 603 if (sourceMap.containsKey(serverLoadList.get(i).getServerName())) { 604 continue; 605 } 606 addRegionPlan(regionsToMove, fetchFromTail, serverLoadList.get(i).getServerName(), 607 regionsToReturn); 608 setLoad(serverLoadList, i, 1); 609 // resolve a possible cyclic assignment pair if we just produced one: 610 // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2 611 List<Integer> pos = 612 returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource()); 613 if (pos != null && pos.size() != 0) { 614 regionsToReturn.get(pos.get(pos.size() - 1)) 615 .setDestination(regionsToReturn.get(regionsToReturn.size() - 1).getDestination()); 616 pos.remove(pos.size() - 1); 617 regionsToReturn.remove(regionsToReturn.size() - 1); 618 } 619 } 620 // Done balance overall 621 } 622 623 /** 624 * Add a region from the head or tail to the List of regions to return. 625 */ 626 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove, 627 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) { 628 RegionPlan rp = null; 629 if (!fetchFromTail) { 630 rp = regionsToMove.remove(); 631 } else { 632 rp = regionsToMove.removeLast(); 633 } 634 rp.setDestination(sn); 635 regionsToReturn.add(rp); 636 } 637}