001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.master.RegionPlan;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
043
044/**
045 * Makes decisions about the placement and movement of Regions across RegionServers.
046 * <p/>
047 * Cluster-wide load balancing will occur only when there are no regions in transition and according
048 * to a fixed period of a time using {@link #balanceCluster(Map)}.
049 * <p/>
050 * On cluster startup, bulk assignment can be used to determine locations for all Regions in a
051 * cluster.
052 * <p/>
053 * This classes produces plans for the
054 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public class SimpleLoadBalancer extends BaseLoadBalancer {
058  private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class);
059
060  private RegionInfoComparator riComparator = new RegionInfoComparator();
061  private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
062  private float avgLoadOverall;
063  private List<ServerAndLoad> serverLoadList = new ArrayList<>();
064  // overallSlop to control simpleLoadBalancer's cluster level threshold
065  private float overallSlop;
066
067  /**
068   * Stores additional per-server information about the regions added/removed during the run of the
069   * balancing algorithm.
070   * </p>
071   * For servers that shed regions, we need to track which regions we have already shed.
072   * <b>nextRegionForUnload</b> contains the index in the list of regions on the server that is the
073   * next to be shed.
074   */
075  private static final class BalanceInfo {
076
077    private int nextRegionForUnload;
078    private int numRegionsAdded;
079    private List<RegionInfo> hriList;
080
081    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) {
082      this.nextRegionForUnload = nextRegionForUnload;
083      this.numRegionsAdded = numRegionsAdded;
084      this.hriList = hriList;
085    }
086
087    int getNextRegionForUnload() {
088      return nextRegionForUnload;
089    }
090
091    int getNumRegionsAdded() {
092      return numRegionsAdded;
093    }
094
095    void setNumRegionsAdded(int numAdded) {
096      this.numRegionsAdded = numAdded;
097    }
098
099    List<RegionInfo> getHriList() {
100      return hriList;
101    }
102
103    void setNextRegionForUnload(int nextRegionForUnload) {
104      this.nextRegionForUnload = nextRegionForUnload;
105    }
106
107  }
108
109  /**
110   * Pass RegionStates and allow balancer to set the current cluster load.
111   */
112  @RestrictedApi(explanation = "Should only be called in tests", link = "",
113      allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java")
114  void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
115    serverLoadList.clear();
116    Map<ServerName, Integer> server2LoadMap = new HashMap<>();
117    float sum = 0;
118    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
119      .entrySet()) {
120      for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
121        if (entry.getKey().equals(masterServerName)) {
122          continue; // we shouldn't include master as potential assignee
123        }
124        int regionNum = entry.getValue().size();
125        server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
126        sum += regionNum;
127      }
128    }
129    server2LoadMap.forEach((k, v) -> {
130      serverLoadList.add(new ServerAndLoad(k, v));
131    });
132    avgLoadOverall = sum / serverLoadList.size();
133  }
134
135  @Override
136  protected void
137    preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
138    // We need clusterLoad of all regions on every server to achieve overall balanced
139    setClusterLoad(loadOfAllTable);
140  }
141
142  @Override
143  protected void loadConf(Configuration conf) {
144    super.loadConf(conf);
145    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
146  }
147
148  @Override
149  public void onConfigurationChange(Configuration conf) {
150    float originSlop = slop;
151    float originOverallSlop = overallSlop;
152    loadConf(conf);
153    LOG.info(
154      "Update configuration of SimpleLoadBalancer, previous slop is {},"
155        + " current slop is {}, previous overallSlop is {}, current overallSlop is {}",
156      originSlop, slop, originOverallSlop, overallSlop);
157  }
158
159  private void setLoad(List<ServerAndLoad> slList, int i, int loadChange) {
160    ServerAndLoad newsl =
161      new ServerAndLoad(slList.get(i).getServerName(), slList.get(i).getLoad() + loadChange);
162    slList.set(i, newsl);
163  }
164
165  /**
166   * A checker function to decide when we want balance overall and certain table has been balanced,
167   * do we still need to re-distribute regions of this table to achieve the state of overall-balance
168   * @return true if this table should be balanced.
169   */
170  private boolean overallNeedsBalance() {
171    int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
172    int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
173    int max = 0, min = Integer.MAX_VALUE;
174    for (ServerAndLoad server : serverLoadList) {
175      max = Math.max(server.getLoad(), max);
176      min = Math.min(server.getLoad(), min);
177    }
178    if (max <= ceiling && min >= floor) {
179      if (LOG.isTraceEnabled()) {
180        // If nothing to balance, then don't say anything unless trace-level logging.
181        LOG.trace("Skipping load balancing because cluster is balanced at overall level");
182      }
183      return false;
184    }
185    return true;
186  }
187
188  private boolean needsBalance(BalancerClusterState c) {
189    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
190    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
191      if (LOG.isDebugEnabled()) {
192        LOG.debug(
193          "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
194      }
195      return false;
196    }
197    if (idleRegionServerExist(c)) {
198      return true;
199    }
200    // Check if we even need to do any load balancing
201    // HBASE-3681 check sloppiness first
202    return sloppyRegionServerExist(cs);
203  }
204
205  /**
206   * Generate a global load balancing plan according to the specified map of server information to
207   * the most loaded regions of each server. The load balancing invariant is that all servers are
208   * within 1 region of the average number of regions per server. If the average is an integer
209   * number, all servers will be balanced to the average. Otherwise, all servers will have either
210   * floor(average) or ceiling(average) regions. HBASE-3609 Modeled regionsToMove using Guava's
211   * MinMaxPriorityQueue so that we can fetch from both ends of the queue. At the beginning, we
212   * check whether there was empty region server just discovered by Master. If so, we alternately
213   * choose new / old regions from head / tail of regionsToMove, respectively. This alternation
214   * avoids clustering young regions on the newly discovered region server. Otherwise, we choose new
215   * regions from head of regionsToMove. Another improvement from HBASE-3609 is that we assign
216   * regions from regionsToMove to underloaded servers in round-robin fashion. Previously one
217   * underloaded server would be filled before we move onto the next underloaded server, leading to
218   * clustering of young regions. Finally, we randomly shuffle underloaded servers so that they
219   * receive offloaded regions relatively evenly across calls to balanceCluster(). The algorithm is
220   * currently implemented as such:
221   * <ol>
222   * <li>Determine the two valid numbers of regions each server should have,
223   * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
224   * <li>Iterate down the most loaded servers, shedding regions from each so each server hosts
225   * exactly <b>MAX</b> regions. Stop once you reach a server that already has &lt;= <b>MAX</b>
226   * regions.
227   * <p>
228   * Order the regions to move from most recent to least.
229   * <li>Iterate down the least loaded servers, assigning regions so each server has exactly
230   * <b>MIN</b> regions. Stop once you reach a server that already has &gt;= <b>MIN</b> regions.
231   * Regions being assigned to underloaded servers are those that were shed in the previous step. It
232   * is possible that there were not enough regions shed to fill each underloaded server to
233   * <b>MIN</b>. If so we end up with a number of regions required to do so, <b>neededRegions</b>.
234   * It is also possible that we were able to fill each underloaded but ended up with regions that
235   * were unassigned from overloaded servers but that still do not have assignment. If neither of
236   * these conditions hold (no regions needed to fill the underloaded servers, no regions leftover
237   * from overloaded servers), we are done and return. Otherwise we handle these cases below.
238   * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers), we iterate the most
239   * loaded servers again, shedding a single server from each (this brings them from having
240   * <b>MAX</b> regions to having <b>MIN</b> regions).
241   * <li>We now definitely have more regions that need assignment, either from the previous step or
242   * from the original shedding from overloaded servers. Iterate the least loaded servers filling
243   * each to <b>MIN</b>.
244   * <li>If we still have more regions that need assignment, again iterate the least loaded servers,
245   * this time giving each one (filling them to <b>MAX</b>) until we run out.
246   * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions. In addition, any server
247   * hosting &gt;= <b>MAX</b> regions is guaranteed to end up with <b>MAX</b> regions at the end of
248   * the balancing. This ensures the minimal number of regions possible are moved.
249   * </ol>
250   * TODO: We can at-most reassign the number of regions away from a particular server to be how
251   * many they report as most loaded. Should we just keep all assignment in memory? Any objections?
252   * Does this mean we need HeapSize on HMaster? Or just careful monitor? (current thinking is we
253   * will hold all assignments in memory)
254   * @param loadOfOneTable Map of regionservers and their load/region information to a list of their
255   *                       most loaded regions
256   * @return a list of regions to be moved, including source and destination, or null if cluster is
257   *         already balanced
258   */
259  @Override
260  protected List<RegionPlan> balanceTable(TableName tableName,
261    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
262    List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable);
263    if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
264      return regionsToReturn;
265    }
266    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
267      if (loadOfOneTable.size() <= 2) {
268        return null;
269      }
270      loadOfOneTable = new HashMap<>(loadOfOneTable);
271      loadOfOneTable.remove(masterServerName);
272    }
273
274    long startTime = EnvironmentEdgeManager.currentTime();
275
276    // construct a Cluster object with clusterMap and rest of the
277    // argument as defaults
278    BalancerClusterState c =
279      new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
280    if (!needsBalance(c) && !this.overallNeedsBalance()) {
281      return null;
282    }
283    ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
284    int numServers = cs.getNumServers();
285    NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
286    int numRegions = cs.getNumRegions();
287    float average = cs.getLoadAverage();
288    int max = (int) Math.ceil(average);
289    int min = (int) average;
290
291    // Using to check balance result.
292    StringBuilder strBalanceParam = new StringBuilder();
293    strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
294      .append(", numServers=").append(numServers).append(", max=").append(max).append(", min=")
295      .append(min);
296    LOG.debug(strBalanceParam.toString());
297
298    // Balance the cluster
299    // TODO: Look at data block locality or a more complex load to do this
300    MinMaxPriorityQueue<RegionPlan> regionsToMove =
301      MinMaxPriorityQueue.orderedBy(rpComparator).create();
302    regionsToReturn = new ArrayList<>();
303
304    // Walk down most loaded, pruning each to the max
305    int serversOverloaded = 0;
306    // flag used to fetch regions from head and tail of list, alternately
307    boolean fetchFromTail = false;
308    Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
309    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
310      .entrySet()) {
311      ServerAndLoad sal = server.getKey();
312      int load = sal.getLoad();
313      if (load <= max) {
314        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
315        continue;
316      }
317      serversOverloaded++;
318      List<RegionInfo> regions = server.getValue();
319      int numToOffload = Math.min(load - max, regions.size());
320      // account for the out-of-band regions which were assigned to this server
321      // after some other region server crashed
322      Collections.sort(regions, riComparator);
323      int numTaken = 0;
324      for (int i = 0; i <= numToOffload;) {
325        RegionInfo hri = regions.get(i); // fetch from head
326        if (fetchFromTail) {
327          hri = regions.get(regions.size() - 1 - i);
328        }
329        i++;
330        // Don't rebalance special regions.
331        if (shouldBeOnMaster(hri) && masterServerName.equals(sal.getServerName())) continue;
332        regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
333        numTaken++;
334        if (numTaken >= numToOffload) {
335          break;
336        }
337      }
338      serverBalanceInfo.put(sal.getServerName(),
339        new BalanceInfo(numToOffload, (-1) * numTaken, server.getValue()));
340    }
341    int totalNumMoved = regionsToMove.size();
342
343    // Walk down least loaded, filling each to the min
344    int neededRegions = 0; // number of regions needed to bring all up to min
345    fetchFromTail = false;
346
347    Map<ServerName, Integer> underloadedServers = new HashMap<>();
348    int maxToTake = numRegions - min;
349    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
350      if (maxToTake == 0) {
351        break; // no more to take
352      }
353      int load = server.getKey().getLoad();
354      if (load >= min) {
355        continue; // look for other servers which haven't reached min
356      }
357      int regionsToPut = min - load;
358      maxToTake -= regionsToPut;
359      underloadedServers.put(server.getKey().getServerName(), regionsToPut);
360    }
361    // number of servers that get new regions
362    int serversUnderloaded = underloadedServers.size();
363    int incr = 1;
364    List<ServerName> sns =
365      Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
366    Collections.shuffle(sns);
367    while (regionsToMove.size() > 0) {
368      int cnt = 0;
369      int i = incr > 0 ? 0 : underloadedServers.size() - 1;
370      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
371        if (regionsToMove.isEmpty()) {
372          break;
373        }
374        ServerName si = sns.get(i);
375        int numToTake = underloadedServers.get(si);
376        if (numToTake == 0) {
377          continue;
378        }
379
380        addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
381
382        underloadedServers.put(si, numToTake - 1);
383        cnt++;
384        BalanceInfo bi = serverBalanceInfo.get(si);
385        bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1);
386      }
387      if (cnt == 0) {
388        break;
389      }
390      // iterates underloadedServers in the other direction
391      incr = -incr;
392    }
393    for (Integer i : underloadedServers.values()) {
394      // If we still want to take some, increment needed
395      neededRegions += i;
396    }
397
398    // Need to do a second pass.
399    // Either more regions to assign out or servers that are still underloaded
400
401    // If we need more to fill min, grab one from each most loaded until enough
402    if (neededRegions != 0) {
403      // Walk down most loaded, grabbing one from each until we get enough
404      for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
405        .entrySet()) {
406        BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
407        int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
408        if (idx >= server.getValue().size()) {
409          break;
410        }
411        RegionInfo region = server.getValue().get(idx);
412        if (region.isMetaRegion()) {
413          continue; // Don't move meta regions.
414        }
415        regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
416        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
417        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
418        totalNumMoved++;
419        if (--neededRegions == 0) {
420          // No more regions needed, done shedding
421          break;
422        }
423      }
424    }
425
426    // Now we have a set of regions that must be all assigned out
427    // Assign each underloaded up to the min, then if leftovers, assign to max
428
429    // Walk down least loaded, assigning to each to fill up to min
430    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
431      int regionCount = server.getKey().getLoad();
432      if (regionCount >= min) {
433        break;
434      }
435      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
436      if (balanceInfo != null) {
437        regionCount += balanceInfo.getNumRegionsAdded();
438      }
439      if (regionCount >= min) {
440        continue;
441      }
442      int numToTake = min - regionCount;
443      int numTaken = 0;
444      while (numTaken < numToTake && 0 < regionsToMove.size()) {
445        addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(),
446          regionsToReturn);
447        numTaken++;
448        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1);
449      }
450    }
451
452    if (min != max) {
453      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
454    }
455
456    long endTime = EnvironmentEdgeManager.currentTime();
457
458    if (!regionsToMove.isEmpty() || neededRegions != 0) {
459      // Emit data so can diagnose how balancer went astray.
460      LOG.warn(
461        "regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded="
462          + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded);
463      StringBuilder sb = new StringBuilder();
464      for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) {
465        if (sb.length() > 0) {
466          sb.append(", ");
467        }
468        sb.append(e.getKey().toString());
469        sb.append(" ");
470        sb.append(e.getValue().size());
471      }
472      LOG.warn("Input " + sb.toString());
473    }
474
475    // All done!
476    LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving "
477      + totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto "
478      + serversUnderloaded + " less loaded servers");
479
480    return regionsToReturn;
481  }
482
483  /**
484   * If we need to balanceoverall, we need to add one more round to peel off one region from each
485   * max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS
486   * that have less regions in whole cluster scope.
487   */
488  private void balanceOverall(List<RegionPlan> regionsToReturn,
489    Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
490    MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) {
491    // Step 1.
492    // A map to record the plan we have already got as status quo, in order to resolve a cyclic
493    // assignment pair,
494    // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
495    Map<ServerName, List<Integer>> returnMap = new HashMap<>();
496    for (int i = 0; i < regionsToReturn.size(); i++) {
497      List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
498      if (pos == null) {
499        pos = new ArrayList<>();
500        returnMap.put(regionsToReturn.get(i).getDestination(), pos);
501      }
502      pos.add(i);
503    }
504
505    // Step 2.
506    // Peel off one region from each RS which has max number of regions now.
507    // Each RS should have either max or min numbers of regions for this table.
508    for (int i = 0; i < serverLoadList.size(); i++) {
509      ServerAndLoad serverload = serverLoadList.get(i);
510      BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
511      if (balanceInfo == null) {
512        continue;
513      }
514      setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
515      if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
516        RegionInfo hriToPlan;
517        if (balanceInfo.getHriList().isEmpty()) {
518          LOG.debug("During balanceOverall, we found " + serverload.getServerName()
519            + " has no RegionInfo, no operation needed");
520          continue;
521        } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
522          continue;
523        } else {
524          hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
525        }
526        RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
527        regionsToMove.add(maxPlan);
528        setLoad(serverLoadList, i, -1);
529      } else if (
530        balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
531          || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min
532      ) {
533        LOG.warn(
534          "Encounter incorrect region numbers after calculating move plan during balanceOverall, "
535            + "for this table, " + serverload.getServerName() + " originally has "
536            + balanceInfo.getHriList().size() + " regions and " + balanceInfo.getNumRegionsAdded()
537            + " regions have been added. Yet, max =" + max + ", min =" + min
538            + ". Thus stop balance for this table"); // should not happen
539        return;
540      }
541    }
542
543    // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
544    // We only need to assign the regionsToMove to
545    // the first n = regionsToMove.size() RS that has least load.
546    Collections.sort(serverLoadList, new Comparator<ServerAndLoad>() {
547      @Override
548      public int compare(ServerAndLoad s1, ServerAndLoad s2) {
549        if (s1.getLoad() == s2.getLoad()) {
550          return 0;
551        } else {
552          return (s1.getLoad() > s2.getLoad()) ? 1 : -1;
553        }
554      }
555    });
556
557    // Step 4.
558    // Preparation before assign out all regionsToMove.
559    // We need to remove the plan that has the source RS equals to destination RS,
560    // since the source RS belongs to the least n loaded RS.
561    int assignLength = regionsToMove.size();
562    // A structure help to map ServerName to it's load and index in ServerLoadList
563    Map<ServerName, Pair<ServerAndLoad, Integer>> SnLoadMap = new HashMap<>();
564    for (int i = 0; i < serverLoadList.size(); i++) {
565      SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i));
566    }
567    Pair<ServerAndLoad, Integer> shredLoad;
568    // A List to help mark the plan in regionsToMove that should be removed
569    List<RegionPlan> planToRemoveList = new ArrayList<>();
570    // A structure to record how many times a server becomes the source of a plan, from
571    // regionsToMove.
572    Map<ServerName, Integer> sourceMap = new HashMap<>();
573    // We remove one of the plan which would cause source RS equals destination RS.
574    // But we should keep in mind that the second plan from such RS should be kept.
575    for (RegionPlan plan : regionsToMove) {
576      // the source RS's load and index in ServerLoadList
577      shredLoad = SnLoadMap.get(plan.getSource());
578      if (!sourceMap.containsKey(plan.getSource())) {
579        sourceMap.put(plan.getSource(), 0);
580      }
581      sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
582      if (shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
583        planToRemoveList.add(plan);
584        // While marked as to be removed, the count should be add back to the source RS
585        setLoad(serverLoadList, shredLoad.getSecond(), 1);
586      }
587    }
588    // Remove those marked plans from regionsToMove,
589    // we cannot direct remove them during iterating through
590    // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
591    for (RegionPlan planToRemove : planToRemoveList) {
592      regionsToMove.remove(planToRemove);
593    }
594
595    // Step 5.
596    // We only need to assign the regionsToMove to
597    // the first n = regionsToMove.size() of them, with least load.
598    // With this strategy adopted, we can gradually achieve the overall balance,
599    // while keeping table level balanced.
600    for (int i = 0; i < assignLength; i++) {
601      // skip the RS that is also the source, we have removed them from regionsToMove in previous
602      // step
603      if (sourceMap.containsKey(serverLoadList.get(i).getServerName())) {
604        continue;
605      }
606      addRegionPlan(regionsToMove, fetchFromTail, serverLoadList.get(i).getServerName(),
607        regionsToReturn);
608      setLoad(serverLoadList, i, 1);
609      // resolve a possible cyclic assignment pair if we just produced one:
610      // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
611      List<Integer> pos =
612        returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
613      if (pos != null && pos.size() != 0) {
614        regionsToReturn.get(pos.get(pos.size() - 1))
615          .setDestination(regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
616        pos.remove(pos.size() - 1);
617        regionsToReturn.remove(regionsToReturn.size() - 1);
618      }
619    }
620    // Done balance overall
621  }
622
623  /**
624   * Add a region from the head or tail to the List of regions to return.
625   */
626  private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
627    final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
628    RegionPlan rp = null;
629    if (!fetchFromTail) {
630      rp = regionsToMove.remove();
631    } else {
632      rp = regionsToMove.removeLast();
633    }
634    rp.setDestination(sn);
635    regionsToReturn.add(rp);
636  }
637}