001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.Random;
029import java.util.TreeMap;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.master.RegionPlan;
038import org.apache.hadoop.hbase.util.Pair;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
043
044/**
045 * Makes decisions about the placement and movement of Regions across
046 * RegionServers.
047 *
048 * <p>Cluster-wide load balancing will occur only when there are no regions in
049 * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
050 *
051 * <p>On cluster startup, bulk assignment can be used to determine
052 * locations for all Regions in a cluster.
053 *
054 * <p>This classes produces plans for the
055 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
056 */
057@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
058public class SimpleLoadBalancer extends BaseLoadBalancer {
059  private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class);
060  private static final Random RANDOM = new Random(System.currentTimeMillis());
061
062  private RegionInfoComparator riComparator = new RegionInfoComparator();
063  private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
064  private float avgLoadOverall;
065  private List<ServerAndLoad> serverLoadList;
066
067  /**
068   * Stores additional per-server information about the regions added/removed
069   * during the run of the balancing algorithm.
070   *
071   * For servers that shed regions, we need to track which regions we have already
072   * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
073   * the server that is the next to be shed.
074   */
075  static class BalanceInfo {
076
077    private int nextRegionForUnload;
078    private int numRegionsAdded;
079    private List<RegionInfo> hriList;
080
081    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<RegionInfo> hriList) {
082      this.nextRegionForUnload = nextRegionForUnload;
083      this.numRegionsAdded = numRegionsAdded;
084      this.hriList = hriList;
085    }
086
087    int getNextRegionForUnload() {
088      return nextRegionForUnload;
089    }
090
091    int getNumRegionsAdded() {
092      return numRegionsAdded;
093    }
094
095    void setNumRegionsAdded(int numAdded) {
096      this.numRegionsAdded = numAdded;
097    }
098
099    List<RegionInfo> getHriList() {
100      return hriList;
101    }
102
103    void setNextRegionForUnload(int nextRegionForUnload) {
104      this.nextRegionForUnload = nextRegionForUnload;
105    }
106
107  }
108
109  @Override
110  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
111    serverLoadList = new ArrayList<>();
112    float sum = 0;
113    for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad.entrySet()){
114      for(Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()){
115        if(entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
116        serverLoadList.add(new ServerAndLoad(entry.getKey(), entry.getValue().size()));
117        sum += entry.getValue().size();
118      }
119    }
120    avgLoadOverall = sum / serverLoadList.size();
121  }
122
123  @Override
124  public void onConfigurationChange(Configuration conf) {
125    float originSlop = slop;
126    float originOverallSlop = overallSlop;
127    super.setConf(conf);
128    LOG.info("Update configuration of SimpleLoadBalancer, previous slop is "
129            + originSlop + ", current slop is " + slop + "previous overallSlop is" +
130            originOverallSlop + ", current overallSlop is " + originOverallSlop);
131  }
132
133  private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
134    ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
135    slList.set(i, newsl);
136  }
137
138  /**
139   * A checker function to decide when we want balance overall and certain table has been balanced,
140   * do we still need to re-distribute regions of this table to achieve the state of overall-balance
141   * @return true if this table should be balanced.
142   */
143  private boolean overallNeedsBalance() {
144    int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
145    int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
146    int max = 0, min = Integer.MAX_VALUE;
147    for(ServerAndLoad server : serverLoadList){
148      max = Math.max(server.getLoad(), max);
149      min = Math.min(server.getLoad(), min);
150    }
151    if (max <= ceiling && min >= floor) {
152      if (LOG.isTraceEnabled()) {
153        // If nothing to balance, then don't say anything unless trace-level logging.
154        LOG.trace("Skipping load balancing because cluster is balanced at overall level");
155      }
156      return false;
157    }
158    return true;
159  }
160
161  /**
162   * Generate a global load balancing plan according to the specified map of
163   * server information to the most loaded regions of each server.
164   *
165   * The load balancing invariant is that all servers are within 1 region of the
166   * average number of regions per server.  If the average is an integer number,
167   * all servers will be balanced to the average.  Otherwise, all servers will
168   * have either floor(average) or ceiling(average) regions.
169   *
170   * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
171   *   we can fetch from both ends of the queue.
172   * At the beginning, we check whether there was empty region server
173   *   just discovered by Master. If so, we alternately choose new / old
174   *   regions from head / tail of regionsToMove, respectively. This alternation
175   *   avoids clustering young regions on the newly discovered region server.
176   *   Otherwise, we choose new regions from head of regionsToMove.
177   *
178   * Another improvement from HBASE-3609 is that we assign regions from
179   *   regionsToMove to underloaded servers in round-robin fashion.
180   *   Previously one underloaded server would be filled before we move onto
181   *   the next underloaded server, leading to clustering of young regions.
182   *
183   * Finally, we randomly shuffle underloaded servers so that they receive
184   *   offloaded regions relatively evenly across calls to balanceCluster().
185   *
186   * The algorithm is currently implemented as such:
187   *
188   * <ol>
189   * <li>Determine the two valid numbers of regions each server should have,
190   *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
191   *
192   * <li>Iterate down the most loaded servers, shedding regions from each so
193   *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
194   *     server that already has &lt;= <b>MAX</b> regions.
195   *     <p>
196   *     Order the regions to move from most recent to least.
197   *
198   * <li>Iterate down the least loaded servers, assigning regions so each server
199   *     has exactly <b>MIN</b> regions.  Stop once you reach a server that
200   *     already has &gt;= <b>MIN</b> regions.
201   *
202   *     Regions being assigned to underloaded servers are those that were shed
203   *     in the previous step.  It is possible that there were not enough
204   *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
205   *     end up with a number of regions required to do so, <b>neededRegions</b>.
206   *
207   *     It is also possible that we were able to fill each underloaded but ended
208   *     up with regions that were unassigned from overloaded servers but that
209   *     still do not have assignment.
210   *
211   *     If neither of these conditions hold (no regions needed to fill the
212   *     underloaded servers, no regions leftover from overloaded servers),
213   *     we are done and return.  Otherwise we handle these cases below.
214   *
215   * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
216   *     we iterate the most loaded servers again, shedding a single server from
217   *     each (this brings them from having <b>MAX</b> regions to having
218   *     <b>MIN</b> regions).
219   *
220   * <li>We now definitely have more regions that need assignment, either from
221   *     the previous step or from the original shedding from overloaded servers.
222   *     Iterate the least loaded servers filling each to <b>MIN</b>.
223   *
224   * <li>If we still have more regions that need assignment, again iterate the
225   *     least loaded servers, this time giving each one (filling them to
226   *     <b>MAX</b>) until we run out.
227   *
228   * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
229   *
230   *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
231   *     to end up with <b>MAX</b> regions at the end of the balancing.  This
232   *     ensures the minimal number of regions possible are moved.
233   * </ol>
234   *
235   * TODO: We can at-most reassign the number of regions away from a particular
236   *       server to be how many they report as most loaded.
237   *       Should we just keep all assignment in memory?  Any objections?
238   *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
239   *       (current thinking is we will hold all assignments in memory)
240   *
241   * @param clusterMap Map of regionservers and their load/region information to
242   *                   a list of their most loaded regions
243   * @return a list of regions to be moved, including source and destination,
244   *         or null if cluster is already balanced
245   */
246  @Override
247  public List<RegionPlan> balanceCluster(
248      Map<ServerName, List<RegionInfo>> clusterMap) {
249    List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
250    if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
251      return regionsToReturn;
252    }
253    if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
254      if (clusterMap.size() <= 2) {
255        return null;
256      }
257      clusterMap = new HashMap<>(clusterMap);
258      clusterMap.remove(masterServerName);
259    }
260
261    long startTime = System.currentTimeMillis();
262
263    // construct a Cluster object with clusterMap and rest of the
264    // argument as defaults
265    Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
266    if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
267
268    ClusterLoadState cs = new ClusterLoadState(clusterMap);
269    int numServers = cs.getNumServers();
270    NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
271    int numRegions = cs.getNumRegions();
272    float average = cs.getLoadAverage();
273    int max = (int)Math.ceil(average);
274    int min = (int)average;
275
276    // Using to check balance result.
277    StringBuilder strBalanceParam = new StringBuilder();
278    strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
279        .append(", numServers=").append(numServers).append(", max=").append(max)
280        .append(", min=").append(min);
281    LOG.debug(strBalanceParam.toString());
282
283    // Balance the cluster
284    // TODO: Look at data block locality or a more complex load to do this
285    MinMaxPriorityQueue<RegionPlan> regionsToMove =
286      MinMaxPriorityQueue.orderedBy(rpComparator).create();
287    regionsToReturn = new ArrayList<>();
288
289    // Walk down most loaded, pruning each to the max
290    int serversOverloaded = 0;
291    // flag used to fetch regions from head and tail of list, alternately
292    boolean fetchFromTail = false;
293    Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
294    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server:
295        serversByLoad.descendingMap().entrySet()) {
296      ServerAndLoad sal = server.getKey();
297      int load = sal.getLoad();
298      if (load <= max) {
299        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
300        continue;
301      }
302      serversOverloaded++;
303      List<RegionInfo> regions = server.getValue();
304      int numToOffload = Math.min(load - max, regions.size());
305      // account for the out-of-band regions which were assigned to this server
306      // after some other region server crashed
307      Collections.sort(regions, riComparator);
308      int numTaken = 0;
309      for (int i = 0; i <= numToOffload; ) {
310        RegionInfo hri = regions.get(i); // fetch from head
311        if (fetchFromTail) {
312          hri = regions.get(regions.size() - 1 - i);
313        }
314        i++;
315        // Don't rebalance special regions.
316        if (shouldBeOnMaster(hri)
317            && masterServerName.equals(sal.getServerName())) continue;
318        regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
319        numTaken++;
320        if (numTaken >= numToOffload) break;
321      }
322      serverBalanceInfo.put(sal.getServerName(),
323              new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
324    }
325    int totalNumMoved = regionsToMove.size();
326
327    // Walk down least loaded, filling each to the min
328    int neededRegions = 0; // number of regions needed to bring all up to min
329    fetchFromTail = false;
330
331    Map<ServerName, Integer> underloadedServers = new HashMap<>();
332    int maxToTake = numRegions - min;
333    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server:
334        serversByLoad.entrySet()) {
335      if (maxToTake == 0) break; // no more to take
336      int load = server.getKey().getLoad();
337      if (load >= min) {
338        continue; // look for other servers which haven't reached min
339      }
340      int regionsToPut = min - load;
341      maxToTake -= regionsToPut;
342      underloadedServers.put(server.getKey().getServerName(), regionsToPut);
343    }
344    // number of servers that get new regions
345    int serversUnderloaded = underloadedServers.size();
346    int incr = 1;
347    List<ServerName> sns =
348      Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
349    Collections.shuffle(sns, RANDOM);
350    while (regionsToMove.size() > 0) {
351      int cnt = 0;
352      int i = incr > 0 ? 0 : underloadedServers.size()-1;
353      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
354        if (regionsToMove.isEmpty()) break;
355        ServerName si = sns.get(i);
356        int numToTake = underloadedServers.get(si);
357        if (numToTake == 0) continue;
358
359        addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
360
361        underloadedServers.put(si, numToTake-1);
362        cnt++;
363        BalanceInfo bi = serverBalanceInfo.get(si);
364        bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
365      }
366      if (cnt == 0) break;
367      // iterates underloadedServers in the other direction
368      incr = -incr;
369    }
370    for (Integer i : underloadedServers.values()) {
371      // If we still want to take some, increment needed
372      neededRegions += i;
373    }
374
375    // Need to do a second pass.
376    // Either more regions to assign out or servers that are still underloaded
377
378    // If we need more to fill min, grab one from each most loaded until enough
379    if (neededRegions != 0) {
380      // Walk down most loaded, grabbing one from each until we get enough
381      for (Map.Entry<ServerAndLoad, List<RegionInfo>> server :
382        serversByLoad.descendingMap().entrySet()) {
383        BalanceInfo balanceInfo =
384          serverBalanceInfo.get(server.getKey().getServerName());
385        int idx =
386          balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
387        if (idx >= server.getValue().size()) break;
388        RegionInfo region = server.getValue().get(idx);
389        if (region.isMetaRegion()) continue; // Don't move meta regions.
390        regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
391        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
392        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
393        totalNumMoved++;
394        if (--neededRegions == 0) {
395          // No more regions needed, done shedding
396          break;
397        }
398      }
399    }
400
401    // Now we have a set of regions that must be all assigned out
402    // Assign each underloaded up to the min, then if leftovers, assign to max
403
404    // Walk down least loaded, assigning to each to fill up to min
405    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server :
406        serversByLoad.entrySet()) {
407      int regionCount = server.getKey().getLoad();
408      if (regionCount >= min) break;
409      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
410      if(balanceInfo != null) {
411        regionCount += balanceInfo.getNumRegionsAdded();
412      }
413      if(regionCount >= min) {
414        continue;
415      }
416      int numToTake = min - regionCount;
417      int numTaken = 0;
418      while(numTaken < numToTake && 0 < regionsToMove.size()) {
419        addRegionPlan(regionsToMove, fetchFromTail,
420          server.getKey().getServerName(), regionsToReturn);
421        numTaken++;
422      }
423    }
424
425    if (min != max) {
426      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
427    }
428
429    long endTime = System.currentTimeMillis();
430
431    if (!regionsToMove.isEmpty() || neededRegions != 0) {
432      // Emit data so can diagnose how balancer went astray.
433      LOG.warn("regionsToMove=" + totalNumMoved +
434        ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
435        ", serversUnderloaded=" + serversUnderloaded);
436      StringBuilder sb = new StringBuilder();
437      for (Map.Entry<ServerName, List<RegionInfo>> e: clusterMap.entrySet()) {
438        if (sb.length() > 0) sb.append(", ");
439        sb.append(e.getKey().toString());
440        sb.append(" ");
441        sb.append(e.getValue().size());
442      }
443      LOG.warn("Input " + sb.toString());
444    }
445
446    // All done!
447    LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
448        "Moving " + totalNumMoved + " regions off of " +
449        serversOverloaded + " overloaded servers onto " +
450        serversUnderloaded + " less loaded servers");
451
452    return regionsToReturn;
453  }
454
455  /**
456   * If we need to balanceoverall, we need to add one more round to peel off one region from each max.
457   * Together with other regions left to be assigned, we distribute all regionToMove, to the RS
458   * that have less regions in whole cluster scope.
459   */
460  public void balanceOverall(List<RegionPlan> regionsToReturn,
461                                       Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
462                                         MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
463    // Step 1.
464    // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
465    // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
466    Map<ServerName, List<Integer>> returnMap = new HashMap<>();
467    for (int i = 0; i < regionsToReturn.size(); i++) {
468      List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
469      if (pos == null) {
470        pos = new ArrayList<>();
471        returnMap.put(regionsToReturn.get(i).getDestination(), pos);
472      }
473      pos.add(i);
474    }
475
476    // Step 2.
477    // Peel off one region from each RS which has max number of regions now.
478    // Each RS should have either max or min numbers of regions for this table.
479    for (int i = 0; i < serverLoadList.size(); i++) {
480      ServerAndLoad serverload = serverLoadList.get(i);
481      BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
482      if (balanceInfo == null) {
483        continue;
484      }
485      setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
486      if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
487        RegionInfo hriToPlan;
488        if (balanceInfo.getHriList().isEmpty()) {
489          LOG.debug("During balanceOverall, we found " + serverload.getServerName()
490                  + " has no RegionInfo, no operation needed");
491          continue;
492        } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
493          continue;
494        } else {
495          hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
496        }
497        RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
498        regionsToMove.add(maxPlan);
499        setLoad(serverLoadList, i, -1);
500      }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
501              || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
502        LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
503                "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
504                " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
505                max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
506        return;
507      }
508    }
509
510    // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
511    // We only need to assign the regionsToMove to
512    // the first n = regionsToMove.size() RS that has least load.
513    Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
514      @Override
515      public int compare(ServerAndLoad s1, ServerAndLoad s2) {
516        if(s1.getLoad() == s2.getLoad()) return 0;
517        else return (s1.getLoad() > s2.getLoad())? 1 : -1;
518      }});
519
520    // Step 4.
521    // Preparation before assign out all regionsToMove.
522    // We need to remove the plan that has the source RS equals to destination RS,
523    // since the source RS belongs to the least n loaded RS.
524    int assignLength = regionsToMove.size();
525    // A structure help to map ServerName to  it's load and index in ServerLoadList
526    Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>();
527    for (int i = 0; i < serverLoadList.size(); i++) {
528      SnLoadMap.put(serverLoadList.get(i).getServerName(), new Pair<>(serverLoadList.get(i), i));
529    }
530    Pair<ServerAndLoad,Integer> shredLoad;
531    // A List to help mark the plan in regionsToMove that should be removed
532    List<RegionPlan> planToRemoveList = new ArrayList<>();
533    // A structure to record how many times a server becomes the source of a plan, from regionsToMove.
534    Map<ServerName, Integer> sourceMap = new HashMap<>();
535    // We remove one of the plan which would cause source RS equals destination RS.
536    // But we should keep in mind that the second plan from such RS should be kept.
537    for(RegionPlan plan: regionsToMove){
538      // the source RS's load and index in ServerLoadList
539      shredLoad = SnLoadMap.get(plan.getSource());
540      if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
541      sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
542      if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
543        planToRemoveList.add(plan);
544        // While marked as to be removed, the count should be add back to the source RS
545        setLoad(serverLoadList, shredLoad.getSecond(), 1);
546      }
547    }
548    // Remove those marked plans from regionsToMove,
549    // we cannot direct remove them during iterating through
550    // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
551    for(RegionPlan planToRemove : planToRemoveList){
552      regionsToMove.remove(planToRemove);
553    }
554
555    // Step 5.
556    // We only need to assign the regionsToMove to
557    // the first n = regionsToMove.size() of them, with least load.
558    // With this strategy adopted, we can gradually achieve the overall balance,
559    // while keeping table level balanced.
560    for(int i = 0; i < assignLength; i++){
561      // skip the RS that is also the source, we have removed them from regionsToMove in previous step
562      if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
563      addRegionPlan(regionsToMove, fetchFromTail,
564              serverLoadList.get(i).getServerName(), regionsToReturn);
565      setLoad(serverLoadList, i, 1);
566      // resolve a possible cyclic assignment pair if we just produced one:
567      // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
568      List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
569      if (pos != null && pos.size() != 0) {
570        regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
571                regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
572        pos.remove(pos.size() - 1);
573        regionsToReturn.remove(regionsToReturn.size() - 1);
574      }
575    }
576    // Done balance overall
577  }
578
579  /**
580   * Add a region from the head or tail to the List of regions to return.
581   */
582  private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
583      final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
584    RegionPlan rp = null;
585    if (!fetchFromTail) rp = regionsToMove.remove();
586    else rp = regionsToMove.removeLast();
587    rp.setDestination(sn);
588    regionsToReturn.add(rp);
589  }
590
591  @Override
592  public List<RegionPlan> balanceCluster(TableName tableName,
593      Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
594    LOG.debug("Start Generate Balance plan for table: " + tableName);
595    return balanceCluster(clusterState);
596  }
597}