001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.Random;
029import java.util.TreeMap;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.master.RegionPlan;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
042
043/**
044 * Makes decisions about the placement and movement of Regions across
045 * RegionServers.
046 *
047 * <p>Cluster-wide load balancing will occur only when there are no regions in
048 * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
049 *
050 * <p>On cluster startup, bulk assignment can be used to determine
051 * locations for all Regions in a cluster.
052 *
053 * <p>This classes produces plans for the
054 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public class SimpleLoadBalancer extends BaseLoadBalancer {
058  private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class);
059  private static final Random RANDOM = new Random(System.currentTimeMillis());
060
061  private RegionInfoComparator riComparator = new RegionInfoComparator();
062  private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
063  private float avgLoadOverall;
064  private List<ServerAndLoad> serverLoadList = new ArrayList<>();
065
066  /**
067   * Stores additional per-server information about the regions added/removed
068   * during the run of the balancing algorithm.
069   *
070   * For servers that shed regions, we need to track which regions we have already
071   * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
072   * the server that is the next to be shed.
073   */
074  static class BalanceInfo {
075
076    private int nextRegionForUnload;
077    private int numRegionsAdded;
078    private List<RegionInfo> hriList;
079
080    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) {
081      this.nextRegionForUnload = nextRegionForUnload;
082      this.numRegionsAdded = numRegionsAdded;
083      this.hriList = hriList;
084    }
085
086    int getNextRegionForUnload() {
087      return nextRegionForUnload;
088    }
089
090    int getNumRegionsAdded() {
091      return numRegionsAdded;
092    }
093
094    void setNumRegionsAdded(int numAdded) {
095      this.numRegionsAdded = numAdded;
096    }
097
098    List<RegionInfo> getHriList() {
099      return hriList;
100    }
101
102    void setNextRegionForUnload(int nextRegionForUnload) {
103      this.nextRegionForUnload = nextRegionForUnload;
104    }
105
106  }
107
108  /**
109   * Pass RegionStates and allow balancer to set the current cluster load.
110   */
111  void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
112    serverLoadList.clear();
113    Map<ServerName, Integer> server2LoadMap = new HashMap<>();
114    float sum = 0;
115    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
116        .entrySet()) {
117      for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
118        if (entry.getKey().equals(masterServerName)) {
119          continue; // we shouldn't include master as potential assignee
120        }
121        int regionNum = entry.getValue().size();
122        server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
123        sum += regionNum;
124      }
125    }
126    server2LoadMap.forEach((k, v) -> {
127      serverLoadList.add(new ServerAndLoad(k, v));
128    });
129    avgLoadOverall = sum / serverLoadList.size();
130  }
131
132  @Override
133  public void onConfigurationChange(Configuration conf) {
134    float originSlop = slop;
135    float originOverallSlop = overallSlop;
136    super.setConf(conf);
137    LOG.info("Update configuration of SimpleLoadBalancer, previous slop is "
138            + originSlop + ", current slop is " + slop + "previous overallSlop is" +
139            originOverallSlop + ", current overallSlop is " + originOverallSlop);
140  }
141
142  private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
143    ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
144    slList.set(i, newsl);
145  }
146
147  /**
148   * A checker function to decide when we want balance overall and certain table has been balanced,
149   * do we still need to re-distribute regions of this table to achieve the state of overall-balance
150   * @return true if this table should be balanced.
151   */
152  private boolean overallNeedsBalance() {
153    int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
154    int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
155    int max = 0, min = Integer.MAX_VALUE;
156    for(ServerAndLoad server : serverLoadList){
157      max = Math.max(server.getLoad(), max);
158      min = Math.min(server.getLoad(), min);
159    }
160    if (max <= ceiling && min >= floor) {
161      if (LOG.isTraceEnabled()) {
162        // If nothing to balance, then don't say anything unless trace-level logging.
163        LOG.trace("Skipping load balancing because cluster is balanced at overall level");
164      }
165      return false;
166    }
167    return true;
168  }
169
170  /**
171   * Generate a global load balancing plan according to the specified map of
172   * server information to the most loaded regions of each server.
173   *
174   * The load balancing invariant is that all servers are within 1 region of the
175   * average number of regions per server.  If the average is an integer number,
176   * all servers will be balanced to the average.  Otherwise, all servers will
177   * have either floor(average) or ceiling(average) regions.
178   *
179   * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
180   *   we can fetch from both ends of the queue.
181   * At the beginning, we check whether there was empty region server
182   *   just discovered by Master. If so, we alternately choose new / old
183   *   regions from head / tail of regionsToMove, respectively. This alternation
184   *   avoids clustering young regions on the newly discovered region server.
185   *   Otherwise, we choose new regions from head of regionsToMove.
186   *
187   * Another improvement from HBASE-3609 is that we assign regions from
188   *   regionsToMove to underloaded servers in round-robin fashion.
189   *   Previously one underloaded server would be filled before we move onto
190   *   the next underloaded server, leading to clustering of young regions.
191   *
192   * Finally, we randomly shuffle underloaded servers so that they receive
193   *   offloaded regions relatively evenly across calls to balanceCluster().
194   *
195   * The algorithm is currently implemented as such:
196   *
197   * <ol>
198   * <li>Determine the two valid numbers of regions each server should have,
199   *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
200   *
201   * <li>Iterate down the most loaded servers, shedding regions from each so
202   *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
203   *     server that already has &lt;= <b>MAX</b> regions.
204   *     <p>
205   *     Order the regions to move from most recent to least.
206   *
207   * <li>Iterate down the least loaded servers, assigning regions so each server
208   *     has exactly <b>MIN</b> regions.  Stop once you reach a server that
209   *     already has &gt;= <b>MIN</b> regions.
210   *
211   *     Regions being assigned to underloaded servers are those that were shed
212   *     in the previous step.  It is possible that there were not enough
213   *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
214   *     end up with a number of regions required to do so, <b>neededRegions</b>.
215   *
216   *     It is also possible that we were able to fill each underloaded but ended
217   *     up with regions that were unassigned from overloaded servers but that
218   *     still do not have assignment.
219   *
220   *     If neither of these conditions hold (no regions needed to fill the
221   *     underloaded servers, no regions leftover from overloaded servers),
222   *     we are done and return.  Otherwise we handle these cases below.
223   *
224   * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
225   *     we iterate the most loaded servers again, shedding a single server from
226   *     each (this brings them from having <b>MAX</b> regions to having
227   *     <b>MIN</b> regions).
228   *
229   * <li>We now definitely have more regions that need assignment, either from
230   *     the previous step or from the original shedding from overloaded servers.
231   *     Iterate the least loaded servers filling each to <b>MIN</b>.
232   *
233   * <li>If we still have more regions that need assignment, again iterate the
234   *     least loaded servers, this time giving each one (filling them to
235   *     <b>MAX</b>) until we run out.
236   *
237   * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
238   *
239   *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
240   *     to end up with <b>MAX</b> regions at the end of the balancing.  This
241   *     ensures the minimal number of regions possible are moved.
242   * </ol>
243   *
244   * TODO: We can at-most reassign the number of regions away from a particular
245   *       server to be how many they report as most loaded.
246   *       Should we just keep all assignment in memory?  Any objections?
247   *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
248   *       (current thinking is we will hold all assignments in memory)
249   *
250   * @param loadOfOneTable Map of regionservers and their load/region information to
251   *                   a list of their most loaded regions
252   * @return a list of regions to be moved, including source and destination,
253   *         or null if cluster is already balanced
254   */
255  @Override
256  public List<RegionPlan> balanceTable(TableName tableName,
257      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
258    List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable);
259    if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
260      return regionsToReturn;
261    }
262    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
263      if (loadOfOneTable.size() <= 2) {
264        return null;
265      }
266      loadOfOneTable = new HashMap<>(loadOfOneTable);
267      loadOfOneTable.remove(masterServerName);
268    }
269
270    long startTime = System.currentTimeMillis();
271
272    // construct a Cluster object with clusterMap and rest of the
273    // argument as defaults
274    Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager);
275    if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
276      return null;
277    }
278    ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
279    int numServers = cs.getNumServers();
280    NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
281    int numRegions = cs.getNumRegions();
282    float average = cs.getLoadAverage();
283    int max = (int)Math.ceil(average);
284    int min = (int)average;
285
286    // Using to check balance result.
287    StringBuilder strBalanceParam = new StringBuilder();
288    strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
289        .append(", numServers=").append(numServers).append(", max=").append(max)
290        .append(", min=").append(min);
291    LOG.debug(strBalanceParam.toString());
292
293    // Balance the cluster
294    // TODO: Look at data block locality or a more complex load to do this
295    MinMaxPriorityQueue<RegionPlan> regionsToMove =
296      MinMaxPriorityQueue.orderedBy(rpComparator).create();
297    regionsToReturn = new ArrayList<>();
298
299    // Walk down most loaded, pruning each to the max
300    int serversOverloaded = 0;
301    // flag used to fetch regions from head and tail of list, alternately
302    boolean fetchFromTail = false;
303    Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
304    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server:
305        serversByLoad.descendingMap().entrySet()) {
306      ServerAndLoad sal = server.getKey();
307      int load = sal.getLoad();
308      if (load <= max) {
309        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
310        continue;
311      }
312      serversOverloaded++;
313      List<RegionInfo> regions = server.getValue();
314      int numToOffload = Math.min(load - max, regions.size());
315      // account for the out-of-band regions which were assigned to this server
316      // after some other region server crashed
317      Collections.sort(regions, riComparator);
318      int numTaken = 0;
319      for (int i = 0; i <= numToOffload; ) {
320        RegionInfo hri = regions.get(i); // fetch from head
321        if (fetchFromTail) {
322          hri = regions.get(regions.size() - 1 - i);
323        }
324        i++;
325        // Don't rebalance special regions.
326        if (shouldBeOnMaster(hri)
327            && masterServerName.equals(sal.getServerName())) continue;
328        regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
329        numTaken++;
330        if (numTaken >= numToOffload) break;
331      }
332      serverBalanceInfo.put(sal.getServerName(),
333              new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
334    }
335    int totalNumMoved = regionsToMove.size();
336
337    // Walk down least loaded, filling each to the min
338    int neededRegions = 0; // number of regions needed to bring all up to min
339    fetchFromTail = false;
340
341    Map<ServerName, Integer> underloadedServers = new HashMap<>();
342    int maxToTake = numRegions - min;
343    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server:
344        serversByLoad.entrySet()) {
345      if (maxToTake == 0) break; // no more to take
346      int load = server.getKey().getLoad();
347      if (load >= min) {
348        continue; // look for other servers which haven't reached min
349      }
350      int regionsToPut = min - load;
351      maxToTake -= regionsToPut;
352      underloadedServers.put(server.getKey().getServerName(), regionsToPut);
353    }
354    // number of servers that get new regions
355    int serversUnderloaded = underloadedServers.size();
356    int incr = 1;
357    List<ServerName> sns =
358      Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
359    Collections.shuffle(sns, RANDOM);
360    while (regionsToMove.size() > 0) {
361      int cnt = 0;
362      int i = incr > 0 ? 0 : underloadedServers.size()-1;
363      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
364        if (regionsToMove.isEmpty()) break;
365        ServerName si = sns.get(i);
366        int numToTake = underloadedServers.get(si);
367        if (numToTake == 0) continue;
368
369        addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
370
371        underloadedServers.put(si, numToTake-1);
372        cnt++;
373        BalanceInfo bi = serverBalanceInfo.get(si);
374        bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
375      }
376      if (cnt == 0) break;
377      // iterates underloadedServers in the other direction
378      incr = -incr;
379    }
380    for (Integer i : underloadedServers.values()) {
381      // If we still want to take some, increment needed
382      neededRegions += i;
383    }
384
385    // Need to do a second pass.
386    // Either more regions to assign out or servers that are still underloaded
387
388    // If we need more to fill min, grab one from each most loaded until enough
389    if (neededRegions != 0) {
390      // Walk down most loaded, grabbing one from each until we get enough
391      for (Map.Entry<ServerAndLoad, List<RegionInfo>> server :
392        serversByLoad.descendingMap().entrySet()) {
393        BalanceInfo balanceInfo =
394          serverBalanceInfo.get(server.getKey().getServerName());
395        int idx =
396          balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
397        if (idx >= server.getValue().size()) break;
398        RegionInfo region = server.getValue().get(idx);
399        if (region.isMetaRegion()) continue; // Don't move meta regions.
400        regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
401        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
402        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
403        totalNumMoved++;
404        if (--neededRegions == 0) {
405          // No more regions needed, done shedding
406          break;
407        }
408      }
409    }
410
411    // Now we have a set of regions that must be all assigned out
412    // Assign each underloaded up to the min, then if leftovers, assign to max
413
414    // Walk down least loaded, assigning to each to fill up to min
415    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server :
416        serversByLoad.entrySet()) {
417      int regionCount = server.getKey().getLoad();
418      if (regionCount >= min) break;
419      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
420      if(balanceInfo != null) {
421        regionCount += balanceInfo.getNumRegionsAdded();
422      }
423      if(regionCount >= min) {
424        continue;
425      }
426      int numToTake = min - regionCount;
427      int numTaken = 0;
428      while(numTaken < numToTake && 0 < regionsToMove.size()) {
429        addRegionPlan(regionsToMove, fetchFromTail,
430          server.getKey().getServerName(), regionsToReturn);
431        numTaken++;
432        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1);
433      }
434    }
435
436    if (min != max) {
437      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
438    }
439
440    long endTime = System.currentTimeMillis();
441
442    if (!regionsToMove.isEmpty() || neededRegions != 0) {
443      // Emit data so can diagnose how balancer went astray.
444      LOG.warn("regionsToMove=" + totalNumMoved +
445        ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
446        ", serversUnderloaded=" + serversUnderloaded);
447      StringBuilder sb = new StringBuilder();
448      for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) {
449        if (sb.length() > 0) sb.append(", ");
450        sb.append(e.getKey().toString());
451        sb.append(" ");
452        sb.append(e.getValue().size());
453      }
454      LOG.warn("Input " + sb.toString());
455    }
456
457    // All done!
458    LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
459        "Moving " + totalNumMoved + " regions off of " +
460        serversOverloaded + " overloaded servers onto " +
461        serversUnderloaded + " less loaded servers");
462
463    return regionsToReturn;
464  }
465
466  /**
467   * If we need to balanceoverall, we need to add one more round to peel off one region from each max.
468   * Together with other regions left to be assigned, we distribute all regionToMove, to the RS
469   * that have less regions in whole cluster scope.
470   */
471  public void balanceOverall(List<RegionPlan> regionsToReturn,
472                                       Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
473                                         MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
474    // Step 1.
475    // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
476    // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
477    Map<ServerName, List<Integer>> returnMap = new HashMap<>();
478    for (int i = 0; i < regionsToReturn.size(); i++) {
479      List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
480      if (pos == null) {
481        pos = new ArrayList<>();
482        returnMap.put(regionsToReturn.get(i).getDestination(), pos);
483      }
484      pos.add(i);
485    }
486
487    // Step 2.
488    // Peel off one region from each RS which has max number of regions now.
489    // Each RS should have either max or min numbers of regions for this table.
490    for (int i = 0; i < serverLoadList.size(); i++) {
491      ServerAndLoad serverload = serverLoadList.get(i);
492      BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
493      if (balanceInfo == null) {
494        continue;
495      }
496      setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
497      if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
498        RegionInfo hriToPlan;
499        if (balanceInfo.getHriList().isEmpty()) {
500          LOG.debug("During balanceOverall, we found " + serverload.getServerName()
501                  + " has no RegionInfo, no operation needed");
502          continue;
503        } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
504          continue;
505        } else {
506          hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
507        }
508        RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
509        regionsToMove.add(maxPlan);
510        setLoad(serverLoadList, i, -1);
511      }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
512              || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
513        LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
514                "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
515                " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
516                max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
517        return;
518      }
519    }
520
521    // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
522    // We only need to assign the regionsToMove to
523    // the first n = regionsToMove.size() RS that has least load.
524    Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
525      @Override
526      public int compare(ServerAndLoad s1, ServerAndLoad s2) {
527        if(s1.getLoad() == s2.getLoad()) return 0;
528        else return (s1.getLoad() > s2.getLoad())? 1 : -1;
529      }});
530
531    // Step 4.
532    // Preparation before assign out all regionsToMove.
533    // We need to remove the plan that has the source RS equals to destination RS,
534    // since the source RS belongs to the least n loaded RS.
535    int assignLength = regionsToMove.size();
536    // A structure help to map ServerName to  it's load and index in ServerLoadList
537    Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>();
538    for (int i = 0; i < serverLoadList.size(); i++) {
539      SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i));
540    }
541    Pair<ServerAndLoad,Integer> shredLoad;
542    // A List to help mark the plan in regionsToMove that should be removed
543    List<RegionPlan> planToRemoveList = new ArrayList<>();
544    // A structure to record how many times a server becomes the source of a plan, from regionsToMove.
545    Map<ServerName, Integer> sourceMap = new HashMap<>();
546    // We remove one of the plan which would cause source RS equals destination RS.
547    // But we should keep in mind that the second plan from such RS should be kept.
548    for(RegionPlan plan: regionsToMove){
549      // the source RS's load and index in ServerLoadList
550      shredLoad = SnLoadMap.get(plan.getSource());
551      if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
552      sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
553      if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
554        planToRemoveList.add(plan);
555        // While marked as to be removed, the count should be add back to the source RS
556        setLoad(serverLoadList, shredLoad.getSecond(), 1);
557      }
558    }
559    // Remove those marked plans from regionsToMove,
560    // we cannot direct remove them during iterating through
561    // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
562    for(RegionPlan planToRemove : planToRemoveList){
563      regionsToMove.remove(planToRemove);
564    }
565
566    // Step 5.
567    // We only need to assign the regionsToMove to
568    // the first n = regionsToMove.size() of them, with least load.
569    // With this strategy adopted, we can gradually achieve the overall balance,
570    // while keeping table level balanced.
571    for(int i = 0; i < assignLength; i++){
572      // skip the RS that is also the source, we have removed them from regionsToMove in previous step
573      if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
574      addRegionPlan(regionsToMove, fetchFromTail,
575              serverLoadList.get(i).getServerName(), regionsToReturn);
576      setLoad(serverLoadList, i, 1);
577      // resolve a possible cyclic assignment pair if we just produced one:
578      // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
579      List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
580      if (pos != null && pos.size() != 0) {
581        regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
582                regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
583        pos.remove(pos.size() - 1);
584        regionsToReturn.remove(regionsToReturn.size() - 1);
585      }
586    }
587    // Done balance overall
588  }
589
590  /**
591   * Add a region from the head or tail to the List of regions to return.
592   */
593  private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
594      final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
595    RegionPlan rp = null;
596    if (!fetchFromTail) rp = regionsToMove.remove();
597    else rp = regionsToMove.removeLast();
598    rp.setDestination(sn);
599    regionsToReturn.add(rp);
600  }
601
602  /**
603   * Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions
604   * on every server to achieve overall balanced
605   */
606  @Override
607  public synchronized List<RegionPlan>
608      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
609    setClusterLoad(loadOfAllTable);
610    return super.balanceCluster(loadOfAllTable);
611  }
612}