001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.master.RegionPlan;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
043
044/**
045 * Makes decisions about the placement and movement of Regions across RegionServers.
046 * <p/>
047 * Cluster-wide load balancing will occur only when there are no regions in transition and according
048 * to a fixed period of a time using {@link #balanceCluster(Map)}.
049 * <p/>
050 * On cluster startup, bulk assignment can be used to determine locations for all Regions in a
051 * cluster.
052 * <p/>
053 * This classes produces plans for the {@code AssignmentManager} to execute.
054 */
055@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
056public class SimpleLoadBalancer extends BaseLoadBalancer {
057  private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class);
058
059  private RegionInfoComparator riComparator = new RegionInfoComparator();
060  private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
061  private float avgLoadOverall;
062  private List<ServerAndLoad> serverLoadList = new ArrayList<>();
063  // overallSlop to control simpleLoadBalancer's cluster level threshold
064  private float overallSlop;
065
066  /**
067   * Stores additional per-server information about the regions added/removed during the run of the
068   * balancing algorithm.
069   * </p>
070   * For servers that shed regions, we need to track which regions we have already shed.
071   * <b>nextRegionForUnload</b> contains the index in the list of regions on the server that is the
072   * next to be shed.
073   */
074  private static final class BalanceInfo {
075
076    private int nextRegionForUnload;
077    private int numRegionsAdded;
078    private List<RegionInfo> hriList;
079
080    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) {
081      this.nextRegionForUnload = nextRegionForUnload;
082      this.numRegionsAdded = numRegionsAdded;
083      this.hriList = hriList;
084    }
085
086    int getNextRegionForUnload() {
087      return nextRegionForUnload;
088    }
089
090    int getNumRegionsAdded() {
091      return numRegionsAdded;
092    }
093
094    void setNumRegionsAdded(int numAdded) {
095      this.numRegionsAdded = numAdded;
096    }
097
098    List<RegionInfo> getHriList() {
099      return hriList;
100    }
101
102    void setNextRegionForUnload(int nextRegionForUnload) {
103      this.nextRegionForUnload = nextRegionForUnload;
104    }
105
106  }
107
108  /**
109   * Pass RegionStates and allow balancer to set the current cluster load.
110   */
111  @RestrictedApi(explanation = "Should only be called in tests", link = "",
112      allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java")
113  void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
114    serverLoadList.clear();
115    Map<ServerName, Integer> server2LoadMap = new HashMap<>();
116    float sum = 0;
117    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
118      .entrySet()) {
119      for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
120        int regionNum = entry.getValue().size();
121        server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
122        sum += regionNum;
123      }
124    }
125    server2LoadMap.forEach((k, v) -> {
126      serverLoadList.add(new ServerAndLoad(k, v));
127    });
128    avgLoadOverall = sum / serverLoadList.size();
129  }
130
131  @Override
132  protected void
133    preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
134    // We need clusterLoad of all regions on every server to achieve overall balanced
135    setClusterLoad(loadOfAllTable);
136  }
137
138  @Override
139  protected void loadConf(Configuration conf) {
140    super.loadConf(conf);
141    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
142  }
143
144  @Override
145  public void onConfigurationChange(Configuration conf) {
146    float originSlop = slop;
147    float originOverallSlop = overallSlop;
148    loadConf(conf);
149    LOG.info(
150      "Update configuration of SimpleLoadBalancer, previous slop is {},"
151        + " current slop is {}, previous overallSlop is {}, current overallSlop is {}",
152      originSlop, slop, originOverallSlop, overallSlop);
153  }
154
155  private void setLoad(List<ServerAndLoad> slList, int i, int loadChange) {
156    ServerAndLoad newsl =
157      new ServerAndLoad(slList.get(i).getServerName(), slList.get(i).getLoad() + loadChange);
158    slList.set(i, newsl);
159  }
160
161  /**
162   * A checker function to decide when we want balance overall and certain table has been balanced,
163   * do we still need to re-distribute regions of this table to achieve the state of overall-balance
164   * @return true if this table should be balanced.
165   */
166  private boolean overallNeedsBalance() {
167    int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
168    int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
169    int max = 0, min = Integer.MAX_VALUE;
170    for (ServerAndLoad server : serverLoadList) {
171      max = Math.max(server.getLoad(), max);
172      min = Math.min(server.getLoad(), min);
173    }
174    if (max <= ceiling && min >= floor) {
175      if (LOG.isTraceEnabled()) {
176        // If nothing to balance, then don't say anything unless trace-level logging.
177        LOG.trace("Skipping load balancing because cluster is balanced at overall level");
178      }
179      return false;
180    }
181    return true;
182  }
183
184  private boolean needsBalance(BalancerClusterState c) {
185    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
186    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
187      if (LOG.isDebugEnabled()) {
188        LOG.debug(
189          "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
190      }
191      return false;
192    }
193    if (idleRegionServerExist(c)) {
194      return true;
195    }
196    // Check if we even need to do any load balancing
197    // HBASE-3681 check sloppiness first
198    return sloppyRegionServerExist(cs);
199  }
200
201  /**
202   * Generate a global load balancing plan according to the specified map of server information to
203   * the most loaded regions of each server. The load balancing invariant is that all servers are
204   * within 1 region of the average number of regions per server. If the average is an integer
205   * number, all servers will be balanced to the average. Otherwise, all servers will have either
206   * floor(average) or ceiling(average) regions. HBASE-3609 Modeled regionsToMove using Guava's
207   * MinMaxPriorityQueue so that we can fetch from both ends of the queue. At the beginning, we
208   * check whether there was empty region server just discovered by Master. If so, we alternately
209   * choose new / old regions from head / tail of regionsToMove, respectively. This alternation
210   * avoids clustering young regions on the newly discovered region server. Otherwise, we choose new
211   * regions from head of regionsToMove. Another improvement from HBASE-3609 is that we assign
212   * regions from regionsToMove to underloaded servers in round-robin fashion. Previously one
213   * underloaded server would be filled before we move onto the next underloaded server, leading to
214   * clustering of young regions. Finally, we randomly shuffle underloaded servers so that they
215   * receive offloaded regions relatively evenly across calls to balanceCluster(). The algorithm is
216   * currently implemented as such:
217   * <ol>
218   * <li>Determine the two valid numbers of regions each server should have,
219   * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
220   * <li>Iterate down the most loaded servers, shedding regions from each so each server hosts
221   * exactly <b>MAX</b> regions. Stop once you reach a server that already has &lt;= <b>MAX</b>
222   * regions.
223   * <p>
224   * Order the regions to move from most recent to least.
225   * <li>Iterate down the least loaded servers, assigning regions so each server has exactly
226   * <b>MIN</b> regions. Stop once you reach a server that already has &gt;= <b>MIN</b> regions.
227   * Regions being assigned to underloaded servers are those that were shed in the previous step. It
228   * is possible that there were not enough regions shed to fill each underloaded server to
229   * <b>MIN</b>. If so we end up with a number of regions required to do so, <b>neededRegions</b>.
230   * It is also possible that we were able to fill each underloaded but ended up with regions that
231   * were unassigned from overloaded servers but that still do not have assignment. If neither of
232   * these conditions hold (no regions needed to fill the underloaded servers, no regions leftover
233   * from overloaded servers), we are done and return. Otherwise we handle these cases below.
234   * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers), we iterate the most
235   * loaded servers again, shedding a single server from each (this brings them from having
236   * <b>MAX</b> regions to having <b>MIN</b> regions).
237   * <li>We now definitely have more regions that need assignment, either from the previous step or
238   * from the original shedding from overloaded servers. Iterate the least loaded servers filling
239   * each to <b>MIN</b>.
240   * <li>If we still have more regions that need assignment, again iterate the least loaded servers,
241   * this time giving each one (filling them to <b>MAX</b>) until we run out.
242   * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions. In addition, any server
243   * hosting &gt;= <b>MAX</b> regions is guaranteed to end up with <b>MAX</b> regions at the end of
244   * the balancing. This ensures the minimal number of regions possible are moved.
245   * </ol>
246   * TODO: We can at-most reassign the number of regions away from a particular server to be how
247   * many they report as most loaded. Should we just keep all assignment in memory? Any objections?
248   * Does this mean we need HeapSize on HMaster? Or just careful monitor? (current thinking is we
249   * will hold all assignments in memory)
250   * @param loadOfOneTable Map of regionservers and their load/region information to a list of their
251   *                       most loaded regions
252   * @return a list of regions to be moved, including source and destination, or null if cluster is
253   *         already balanced
254   */
255  @Override
256  protected List<RegionPlan> balanceTable(TableName tableName,
257    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
258    long startTime = EnvironmentEdgeManager.currentTime();
259
260    // construct a Cluster object with clusterMap and rest of the
261    // argument as defaults
262    BalancerClusterState c =
263      new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
264    if (!needsBalance(c) && !this.overallNeedsBalance()) {
265      return null;
266    }
267    ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
268    int numServers = cs.getNumServers();
269    NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
270    int numRegions = cs.getNumRegions();
271    float average = cs.getLoadAverage();
272    int max = (int) Math.ceil(average);
273    int min = (int) average;
274
275    // Using to check balance result.
276    StringBuilder strBalanceParam = new StringBuilder();
277    strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
278      .append(", numServers=").append(numServers).append(", max=").append(max).append(", min=")
279      .append(min);
280    LOG.debug(strBalanceParam.toString());
281
282    // Balance the cluster
283    // TODO: Look at data block locality or a more complex load to do this
284    MinMaxPriorityQueue<RegionPlan> regionsToMove =
285      MinMaxPriorityQueue.orderedBy(rpComparator).create();
286    List<RegionPlan> regionsToReturn = new ArrayList<>();
287
288    // Walk down most loaded, pruning each to the max
289    int serversOverloaded = 0;
290    // flag used to fetch regions from head and tail of list, alternately
291    boolean fetchFromTail = false;
292    Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
293    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
294      .entrySet()) {
295      ServerAndLoad sal = server.getKey();
296      int load = sal.getLoad();
297      if (load <= max) {
298        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
299        continue;
300      }
301      serversOverloaded++;
302      List<RegionInfo> regions = server.getValue();
303      int numToOffload = Math.min(load - max, regions.size());
304      // account for the out-of-band regions which were assigned to this server
305      // after some other region server crashed
306      Collections.sort(regions, riComparator);
307      int numTaken = 0;
308      for (int i = 0; i <= numToOffload;) {
309        RegionInfo hri = regions.get(i); // fetch from head
310        if (fetchFromTail) {
311          hri = regions.get(regions.size() - 1 - i);
312        }
313        i++;
314        regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
315        numTaken++;
316        if (numTaken >= numToOffload) {
317          break;
318        }
319      }
320      serverBalanceInfo.put(sal.getServerName(),
321        new BalanceInfo(numToOffload, -numTaken, server.getValue()));
322    }
323    int totalNumMoved = regionsToMove.size();
324
325    // Walk down least loaded, filling each to the min
326    int neededRegions = 0; // number of regions needed to bring all up to min
327    fetchFromTail = false;
328
329    Map<ServerName, Integer> underloadedServers = new HashMap<>();
330    int maxToTake = numRegions - min;
331    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
332      if (maxToTake == 0) {
333        break; // no more to take
334      }
335      int load = server.getKey().getLoad();
336      if (load >= min) {
337        continue; // look for other servers which haven't reached min
338      }
339      int regionsToPut = min - load;
340      maxToTake -= regionsToPut;
341      underloadedServers.put(server.getKey().getServerName(), regionsToPut);
342    }
343    // number of servers that get new regions
344    int serversUnderloaded = underloadedServers.size();
345    int incr = 1;
346    List<ServerName> sns =
347      Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
348    Collections.shuffle(sns);
349    while (regionsToMove.size() > 0) {
350      int cnt = 0;
351      int i = incr > 0 ? 0 : underloadedServers.size() - 1;
352      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
353        if (regionsToMove.isEmpty()) {
354          break;
355        }
356        ServerName si = sns.get(i);
357        int numToTake = underloadedServers.get(si);
358        if (numToTake == 0) {
359          continue;
360        }
361
362        addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
363
364        underloadedServers.put(si, numToTake - 1);
365        cnt++;
366        BalanceInfo bi = serverBalanceInfo.get(si);
367        bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1);
368      }
369      if (cnt == 0) {
370        break;
371      }
372      // iterates underloadedServers in the other direction
373      incr = -incr;
374    }
375    for (Integer i : underloadedServers.values()) {
376      // If we still want to take some, increment needed
377      neededRegions += i;
378    }
379
380    // Need to do a second pass.
381    // Either more regions to assign out or servers that are still underloaded
382
383    // If we need more to fill min, grab one from each most loaded until enough
384    if (neededRegions != 0) {
385      // Walk down most loaded, grabbing one from each until we get enough
386      for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
387        .entrySet()) {
388        BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
389        int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
390        if (idx >= server.getValue().size()) {
391          break;
392        }
393        RegionInfo region = server.getValue().get(idx);
394        if (region.isMetaRegion()) {
395          continue; // Don't move meta regions.
396        }
397        regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
398        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
399        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
400        totalNumMoved++;
401        if (--neededRegions == 0) {
402          // No more regions needed, done shedding
403          break;
404        }
405      }
406    }
407
408    // Now we have a set of regions that must be all assigned out
409    // Assign each underloaded up to the min, then if leftovers, assign to max
410
411    // Walk down least loaded, assigning to each to fill up to min
412    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
413      int regionCount = server.getKey().getLoad();
414      if (regionCount >= min) {
415        break;
416      }
417      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
418      if (balanceInfo != null) {
419        regionCount += balanceInfo.getNumRegionsAdded();
420      }
421      if (regionCount >= min) {
422        continue;
423      }
424      int numToTake = min - regionCount;
425      int numTaken = 0;
426      while (numTaken < numToTake && 0 < regionsToMove.size()) {
427        addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(),
428          regionsToReturn);
429        numTaken++;
430        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1);
431      }
432    }
433
434    if (min != max) {
435      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
436    }
437
438    long endTime = EnvironmentEdgeManager.currentTime();
439
440    if (!regionsToMove.isEmpty() || neededRegions != 0) {
441      // Emit data so can diagnose how balancer went astray.
442      LOG.warn(
443        "regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded="
444          + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded);
445      StringBuilder sb = new StringBuilder();
446      for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) {
447        if (sb.length() > 0) {
448          sb.append(", ");
449        }
450        sb.append(e.getKey().toString());
451        sb.append(" ");
452        sb.append(e.getValue().size());
453      }
454      LOG.warn("Input " + sb.toString());
455    }
456
457    // All done!
458    LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving "
459      + totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto "
460      + serversUnderloaded + " less loaded servers");
461
462    return regionsToReturn;
463  }
464
465  /**
466   * If we need to balanceoverall, we need to add one more round to peel off one region from each
467   * max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS
468   * that have less regions in whole cluster scope.
469   */
470  private void balanceOverall(List<RegionPlan> regionsToReturn,
471    Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
472    MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) {
473    // Step 1.
474    // A map to record the plan we have already got as status quo, in order to resolve a cyclic
475    // assignment pair,
476    // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
477    Map<ServerName, List<Integer>> returnMap = new HashMap<>();
478    for (int i = 0; i < regionsToReturn.size(); i++) {
479      List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
480      if (pos == null) {
481        pos = new ArrayList<>();
482        returnMap.put(regionsToReturn.get(i).getDestination(), pos);
483      }
484      pos.add(i);
485    }
486
487    // Step 2.
488    // Peel off one region from each RS which has max number of regions now.
489    // Each RS should have either max or min numbers of regions for this table.
490    for (int i = 0; i < serverLoadList.size(); i++) {
491      ServerAndLoad serverload = serverLoadList.get(i);
492      BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
493      if (balanceInfo == null) {
494        continue;
495      }
496      setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
497      if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
498        RegionInfo hriToPlan;
499        if (balanceInfo.getHriList().isEmpty()) {
500          LOG.debug("During balanceOverall, we found " + serverload.getServerName()
501            + " has no RegionInfo, no operation needed");
502          continue;
503        } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
504          continue;
505        } else {
506          hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
507        }
508        RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
509        regionsToMove.add(maxPlan);
510        setLoad(serverLoadList, i, -1);
511      } else if (
512        balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
513          || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min
514      ) {
515        LOG.warn(
516          "Encounter incorrect region numbers after calculating move plan during balanceOverall, "
517            + "for this table, " + serverload.getServerName() + " originally has "
518            + balanceInfo.getHriList().size() + " regions and " + balanceInfo.getNumRegionsAdded()
519            + " regions have been added. Yet, max =" + max + ", min =" + min
520            + ". Thus stop balance for this table"); // should not happen
521        return;
522      }
523    }
524
525    // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
526    // We only need to assign the regionsToMove to
527    // the first n = regionsToMove.size() RS that has least load.
528    Collections.sort(serverLoadList, new Comparator<ServerAndLoad>() {
529      @Override
530      public int compare(ServerAndLoad s1, ServerAndLoad s2) {
531        if (s1.getLoad() == s2.getLoad()) {
532          return 0;
533        } else {
534          return (s1.getLoad() > s2.getLoad()) ? 1 : -1;
535        }
536      }
537    });
538
539    // Step 4.
540    // Preparation before assign out all regionsToMove.
541    // We need to remove the plan that has the source RS equals to destination RS,
542    // since the source RS belongs to the least n loaded RS.
543    int assignLength = regionsToMove.size();
544    // A structure help to map ServerName to it's load and index in ServerLoadList
545    Map<ServerName, Pair<ServerAndLoad, Integer>> SnLoadMap = new HashMap<>();
546    for (int i = 0; i < serverLoadList.size(); i++) {
547      SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i));
548    }
549    Pair<ServerAndLoad, Integer> shredLoad;
550    // A List to help mark the plan in regionsToMove that should be removed
551    List<RegionPlan> planToRemoveList = new ArrayList<>();
552    // A structure to record how many times a server becomes the source of a plan, from
553    // regionsToMove.
554    Map<ServerName, Integer> sourceMap = new HashMap<>();
555    // We remove one of the plan which would cause source RS equals destination RS.
556    // But we should keep in mind that the second plan from such RS should be kept.
557    for (RegionPlan plan : regionsToMove) {
558      // the source RS's load and index in ServerLoadList
559      shredLoad = SnLoadMap.get(plan.getSource());
560      if (!sourceMap.containsKey(plan.getSource())) {
561        sourceMap.put(plan.getSource(), 0);
562      }
563      sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
564      if (shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
565        planToRemoveList.add(plan);
566        // While marked as to be removed, the count should be add back to the source RS
567        setLoad(serverLoadList, shredLoad.getSecond(), 1);
568      }
569    }
570    // Remove those marked plans from regionsToMove,
571    // we cannot direct remove them during iterating through
572    // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
573    for (RegionPlan planToRemove : planToRemoveList) {
574      regionsToMove.remove(planToRemove);
575    }
576
577    // Step 5.
578    // We only need to assign the regionsToMove to
579    // the first n = regionsToMove.size() of them, with least load.
580    // With this strategy adopted, we can gradually achieve the overall balance,
581    // while keeping table level balanced.
582    for (int i = 0; i < assignLength; i++) {
583      // skip the RS that is also the source, we have removed them from regionsToMove in previous
584      // step
585      if (sourceMap.containsKey(serverLoadList.get(i).getServerName())) {
586        continue;
587      }
588      addRegionPlan(regionsToMove, fetchFromTail, serverLoadList.get(i).getServerName(),
589        regionsToReturn);
590      setLoad(serverLoadList, i, 1);
591      // resolve a possible cyclic assignment pair if we just produced one:
592      // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
593      List<Integer> pos =
594        returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
595      if (pos != null && pos.size() != 0) {
596        regionsToReturn.get(pos.get(pos.size() - 1))
597          .setDestination(regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
598        pos.remove(pos.size() - 1);
599        regionsToReturn.remove(regionsToReturn.size() - 1);
600      }
601    }
602    // Done balance overall
603  }
604
605  /**
606   * Add a region from the head or tail to the List of regions to return.
607   */
608  private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
609    final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
610    RegionPlan rp = null;
611    if (!fetchFromTail) {
612      rp = regionsToMove.remove();
613    } else {
614      rp = regionsToMove.removeLast();
615    }
616    rp.setDestination(sn);
617    regionsToReturn.add(rp);
618  }
619}