View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.hbase.ClusterStatus;
42  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableExistsException;
47  import org.apache.hadoop.hbase.regionserver.HRegion;
48  import org.apache.hadoop.hbase.util.Bytes;
49  
50  import com.google.common.base.Joiner;
51  import com.google.common.collect.ArrayListMultimap;
52  import com.google.common.collect.MinMaxPriorityQueue;
53  import com.google.common.collect.Sets;
54  
55  /**
56   * Makes decisions about the placement and movement of Regions across
57   * RegionServers.
58   *
59   * <p>Cluster-wide load balancing will occur only when there are no regions in
60   * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
61   *
62   * <p>Inline region placement with {@link #immediateAssignment} can be used when
63   * the Master needs to handle closed regions that it currently does not have
64   * a destination set for.  This can happen during master failover.
65   *
66   * <p>On cluster startup, bulk assignment can be used to determine
67   * locations for all Regions in a cluster.
68   *
69   * <p>This classes produces plans for the {@link AssignmentManager} to execute.
70   */
71  public class DefaultLoadBalancer implements LoadBalancer {
72    private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
73    private static final Random RANDOM = new Random(System.currentTimeMillis());
74    // slop for regions
75    private float slop;
76    private Configuration config;
77    private ClusterStatus status;
78    private MasterServices services;
79  
80    public void setClusterStatus(ClusterStatus st) {
81      this.status = st;
82    }
83  
84    public void setMasterServices(MasterServices masterServices) {
85      this.services = masterServices;
86    }
87  
88    @Override
89    public void setConf(Configuration conf) {
90      this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
91      if (slop < 0) slop = 0;
92      else if (slop > 1) slop = 1;
93      this.config = conf;
94    }
95  
96    @Override
97    public Configuration getConf() {
98      return this.config;
99    }
100 
101   /*
102   * The following comparator assumes that RegionId from HRegionInfo can
103   * represent the age of the region - larger RegionId means the region
104   * is younger.
105   * This comparator is used in balanceCluster() to account for the out-of-band
106   * regions which were assigned to the server after some other region server
107   * crashed.
108   */
109    private static class RegionInfoComparator implements Comparator<HRegionInfo> {
110        @Override
111        public int compare(HRegionInfo l, HRegionInfo r) {
112           long diff = r.getRegionId() - l.getRegionId();
113           if (diff < 0) return -1;
114           if (diff > 0) return 1;
115           return 0;
116        }
117    }
118 
119 
120    RegionInfoComparator riComparator = new RegionInfoComparator();
121    
122    private class RegionPlanComparator implements Comparator<RegionPlan> {
123     @Override
124     public int compare(RegionPlan l, RegionPlan r) {
125       long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
126       if (diff < 0) return -1;
127       if (diff > 0) return 1;
128       return 0;
129     }
130   }
131 
132   RegionPlanComparator rpComparator = new RegionPlanComparator();
133 
134   /**
135    * Generate a global load balancing plan according to the specified map of
136    * server information to the most loaded regions of each server.
137    *
138    * The load balancing invariant is that all servers are within 1 region of the
139    * average number of regions per server.  If the average is an integer number,
140    * all servers will be balanced to the average.  Otherwise, all servers will
141    * have either floor(average) or ceiling(average) regions.
142    *
143    * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
144    *   we can fetch from both ends of the queue. 
145    * At the beginning, we check whether there was empty region server 
146    *   just discovered by Master. If so, we alternately choose new / old
147    *   regions from head / tail of regionsToMove, respectively. This alternation
148    *   avoids clustering young regions on the newly discovered region server.
149    *   Otherwise, we choose new regions from head of regionsToMove.
150    *   
151    * Another improvement from HBASE-3609 is that we assign regions from
152    *   regionsToMove to underloaded servers in round-robin fashion.
153    *   Previously one underloaded server would be filled before we move onto
154    *   the next underloaded server, leading to clustering of young regions.
155    *   
156    * Finally, we randomly shuffle underloaded servers so that they receive
157    *   offloaded regions relatively evenly across calls to balanceCluster().
158    *         
159    * The algorithm is currently implemented as such:
160    *
161    * <ol>
162    * <li>Determine the two valid numbers of regions each server should have,
163    *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
164    *
165    * <li>Iterate down the most loaded servers, shedding regions from each so
166    *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
167    *     server that already has &lt;= <b>MAX</b> regions.
168    *     <p>
169    *     Order the regions to move from most recent to least.
170    *
171    * <li>Iterate down the least loaded servers, assigning regions so each server
172    *     has exactly </b>MIN</b> regions.  Stop once you reach a server that
173    *     already has &gt;= <b>MIN</b> regions.
174    *
175    *     Regions being assigned to underloaded servers are those that were shed
176    *     in the previous step.  It is possible that there were not enough
177    *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
178    *     end up with a number of regions required to do so, <b>neededRegions</b>.
179    *
180    *     It is also possible that we were able to fill each underloaded but ended
181    *     up with regions that were unassigned from overloaded servers but that
182    *     still do not have assignment.
183    *
184    *     If neither of these conditions hold (no regions needed to fill the
185    *     underloaded servers, no regions leftover from overloaded servers),
186    *     we are done and return.  Otherwise we handle these cases below.
187    *
188    * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
189    *     we iterate the most loaded servers again, shedding a single server from
190    *     each (this brings them from having <b>MAX</b> regions to having
191    *     <b>MIN</b> regions).
192    *
193    * <li>We now definitely have more regions that need assignment, either from
194    *     the previous step or from the original shedding from overloaded servers.
195    *     Iterate the least loaded servers filling each to <b>MIN</b>.
196    *
197    * <li>If we still have more regions that need assignment, again iterate the
198    *     least loaded servers, this time giving each one (filling them to
199    *     </b>MAX</b>) until we run out.
200    *
201    * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
202    *
203    *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
204    *     to end up with <b>MAX</b> regions at the end of the balancing.  This
205    *     ensures the minimal number of regions possible are moved.
206    * </ol>
207    *
208    * TODO: We can at-most reassign the number of regions away from a particular
209    *       server to be how many they report as most loaded.
210    *       Should we just keep all assignment in memory?  Any objections?
211    *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
212    *       (current thinking is we will hold all assignments in memory)
213    *
214    * @param clusterState Map of regionservers and their load/region information to
215    *                   a list of their most loaded regions
216    * @return a list of regions to be moved, including source and destination,
217    *         or null if cluster is already balanced
218    */
219   public List<RegionPlan> balanceCluster(
220       Map<ServerName, List<HRegionInfo>> clusterState) {
221     boolean emptyRegionServerPresent = false;
222     long startTime = System.currentTimeMillis();
223 
224     int numServers = clusterState.size();
225     if (numServers == 0) {
226       LOG.debug("numServers=0 so skipping load balancing");
227       return null;
228     }
229     NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad =
230       new TreeMap<ServerAndLoad, List<HRegionInfo>>();
231     int numRegions = 0;
232     int maxRegionCountPerServer = 0;
233     // Iterate so we can count regions as we build the map
234     for (Map.Entry<ServerName, List<HRegionInfo>> server: clusterState.entrySet()) {
235       List<HRegionInfo> regions = server.getValue();
236       int sz = regions.size();
237       if (sz == 0) emptyRegionServerPresent = true;
238       numRegions += sz;
239       if (maxRegionCountPerServer < sz) maxRegionCountPerServer = sz;
240       serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
241     }
242     // Check if we even need to do any load balancing
243     float average = (float)numRegions / numServers; // for logging
244     // HBASE-3681 check sloppiness first
245     int floor = (int) Math.floor(average * (1 - slop));
246     int ceiling = (int) Math.ceil(average * (1 + slop));
247     if (serversByLoad.lastKey().getLoad() <= ceiling &&
248        serversByLoad.firstKey().getLoad() >= floor) {
249       // Skipped because no server outside (min,max) range
250       LOG.info("Skipping load balancing because balanced cluster; " +
251         "servers=" + numServers + " " +
252         "regions=" + numRegions + " average=" + average + " " +
253         "mostloaded=" + serversByLoad.lastKey().getLoad() +
254         " leastloaded=" + serversByLoad.firstKey().getLoad());
255       return null;
256     }
257     int min = numRegions / numServers;
258     int max = numRegions % numServers == 0 ? min : min + 1;
259     if (maxRegionCountPerServer == 1) return null; // table is balanced
260 
261     // Using to check balance result.
262     StringBuilder strBalanceParam = new StringBuilder();
263     strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
264         .append(", numServers=").append(numServers).append(", max=").append(max)
265         .append(", min=").append(min);
266     LOG.debug(strBalanceParam.toString());
267 
268     // Balance the cluster
269     // TODO: Look at data block locality or a more complex load to do this
270     MinMaxPriorityQueue<RegionPlan> regionsToMove =
271       MinMaxPriorityQueue.orderedBy(rpComparator).create();
272     List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
273 
274     // Walk down most loaded, pruning each to the max
275     int serversOverloaded = 0;
276     // flag used to fetch regions from head and tail of list, alternately
277     boolean fetchFromTail = false;
278     Map<ServerName, BalanceInfo> serverBalanceInfo =
279       new TreeMap<ServerName, BalanceInfo>();
280     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
281         serversByLoad.descendingMap().entrySet()) {
282       ServerAndLoad sal = server.getKey();
283       int regionCount = sal.getLoad();
284       if (regionCount <= max) {
285         serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
286         break;
287       }
288       serversOverloaded++;
289       List<HRegionInfo> regions = server.getValue();
290       int numToOffload = Math.min(regionCount - max, regions.size());
291       // account for the out-of-band regions which were assigned to this server
292       // after some other region server crashed 
293       Collections.sort(regions, riComparator);
294       int numTaken = 0;
295       for (int i = 0; i <= numToOffload; ) {
296         HRegionInfo hri = regions.get(i); // fetch from head
297         if (fetchFromTail) {
298           hri = regions.get(regions.size() - 1 - i);
299         }
300         i++;
301         // Don't rebalance meta regions.
302         if (hri.isMetaRegion()) continue;
303         regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
304         numTaken++;
305         if (numTaken >= numToOffload) break;
306         // fetch in alternate order if there is new region server
307         if (emptyRegionServerPresent) {
308           fetchFromTail = !fetchFromTail;
309         }
310       }
311       serverBalanceInfo.put(sal.getServerName(),
312         new BalanceInfo(numToOffload, (-1)*numTaken));
313     }
314     int totalNumMoved = regionsToMove.size();
315 
316     // Walk down least loaded, filling each to the min
317     int neededRegions = 0; // number of regions needed to bring all up to min
318     fetchFromTail = false;
319 
320     Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
321     int maxToTake = numRegions - (int)average;
322     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
323         serversByLoad.entrySet()) {
324       if (maxToTake == 0) break; // no more to take
325       int regionCount = server.getKey().getLoad();
326       if (regionCount >= min && regionCount > 0) {
327         continue; // look for other servers which haven't reached min
328       }
329       int regionsToPut = min - regionCount;
330       if (regionsToPut == 0)
331       {
332         regionsToPut = 1;
333       }
334       maxToTake -= regionsToPut;
335       underloadedServers.put(server.getKey().getServerName(), regionsToPut);
336     }
337     // number of servers that get new regions
338     int serversUnderloaded = underloadedServers.size();
339     int incr = 1;
340     List<ServerName> sns =
341       Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
342     Collections.shuffle(sns, RANDOM);
343     while (regionsToMove.size() > 0) {
344       int cnt = 0;
345       int i = incr > 0 ? 0 : underloadedServers.size()-1;
346       for (; i >= 0 && i < underloadedServers.size(); i += incr) {
347         if (regionsToMove.isEmpty()) break;
348         ServerName si = sns.get(i);
349         int numToTake = underloadedServers.get(si);
350         if (numToTake == 0) continue;
351 
352         addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
353         if (emptyRegionServerPresent) {
354           fetchFromTail = !fetchFromTail;
355         }
356 
357         underloadedServers.put(si, numToTake-1);
358         cnt++;
359         BalanceInfo bi = serverBalanceInfo.get(si);
360         if (bi == null) {
361           bi = new BalanceInfo(0, 0);
362           serverBalanceInfo.put(si, bi);
363         }
364         bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
365       }
366       if (cnt == 0) break;
367       // iterates underloadedServers in the other direction
368       incr = -incr;
369     }
370     for (Integer i : underloadedServers.values()) {
371       // If we still want to take some, increment needed
372       neededRegions += i;
373     }
374 
375     // If none needed to fill all to min and none left to drain all to max,
376     // we are done
377     if (neededRegions == 0 && regionsToMove.isEmpty()) {
378       long endTime = System.currentTimeMillis();
379       LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
380           "Moving " + totalNumMoved + " regions off of " +
381           serversOverloaded + " overloaded servers onto " +
382           serversUnderloaded + " less loaded servers");
383       return regionsToReturn;
384     }
385 
386     // Need to do a second pass.
387     // Either more regions to assign out or servers that are still underloaded
388 
389     // If we need more to fill min, grab one from each most loaded until enough
390     if (neededRegions != 0) {
391       // Walk down most loaded, grabbing one from each until we get enough
392       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
393         serversByLoad.descendingMap().entrySet()) {
394         BalanceInfo balanceInfo =
395           serverBalanceInfo.get(server.getKey().getServerName());
396         int idx =
397           balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
398         if (idx >= server.getValue().size()) break;
399         HRegionInfo region = server.getValue().get(idx);
400         if (region.isMetaRegion()) continue; // Don't move meta regions.
401         regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
402         totalNumMoved++;
403         if (--neededRegions == 0) {
404           // No more regions needed, done shedding
405           break;
406         }
407       }
408     }
409 
410     // Now we have a set of regions that must be all assigned out
411     // Assign each underloaded up to the min, then if leftovers, assign to max
412 
413     // Walk down least loaded, assigning to each to fill up to min
414     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
415         serversByLoad.entrySet()) {
416       int regionCount = server.getKey().getLoad();
417       if (regionCount >= min) break;
418       BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
419       if(balanceInfo != null) {
420         regionCount += balanceInfo.getNumRegionsAdded();
421       }
422       if(regionCount >= min) {
423         continue;
424       }
425       int numToTake = min - regionCount;
426       int numTaken = 0;
427       while(numTaken < numToTake && 0 < regionsToMove.size()) {
428         addRegionPlan(regionsToMove, fetchFromTail,
429           server.getKey().getServerName(), regionsToReturn);
430         numTaken++;
431         if (emptyRegionServerPresent) {
432           fetchFromTail = !fetchFromTail;
433         }
434       }
435     }
436 
437     // If we still have regions to dish out, assign underloaded to max
438     if (0 < regionsToMove.size()) {
439       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
440         serversByLoad.entrySet()) {
441         int regionCount = server.getKey().getLoad();
442         BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
443         if(balanceInfo != null) {
444           regionCount += balanceInfo.getNumRegionsAdded();
445         }
446         if(regionCount >= max) {
447           break;
448         }
449         addRegionPlan(regionsToMove, fetchFromTail,
450           server.getKey().getServerName(), regionsToReturn);
451         if (emptyRegionServerPresent) {
452           fetchFromTail = !fetchFromTail;
453         }
454         if (regionsToMove.isEmpty()) {
455           break;
456         }
457       }
458     }
459 
460     long endTime = System.currentTimeMillis();
461 
462     if (!regionsToMove.isEmpty() || neededRegions != 0) {
463       // Emit data so can diagnose how balancer went astray.
464       LOG.warn("regionsToMove=" + totalNumMoved +
465         ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
466         ", serversUnderloaded=" + serversUnderloaded);
467       StringBuilder sb = new StringBuilder();
468       for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterState.entrySet()) {
469         if (sb.length() > 0) sb.append(", ");
470         sb.append(e.getKey().toString());
471         sb.append(" ");
472         sb.append(e.getValue().size());
473       }
474       LOG.warn("Input " + sb.toString());
475     }
476 
477     // All done!
478     LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
479         "Moving " + totalNumMoved + " regions off of " +
480         serversOverloaded + " overloaded servers onto " +
481         serversUnderloaded + " less loaded servers");
482 
483     return regionsToReturn;
484   }
485 
486   /**
487    * Add a region from the head or tail to the List of regions to return.
488    */
489   void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
490       final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
491     RegionPlan rp = null;
492     if (!fetchFromTail) rp = regionsToMove.remove();
493     else rp = regionsToMove.removeLast();
494     rp.setDestination(sn);
495     regionsToReturn.add(rp);
496   }
497 
498   /**
499    * Stores additional per-server information about the regions added/removed
500    * during the run of the balancing algorithm.
501    *
502    * For servers that shed regions, we need to track which regions we have
503    * already shed.  <b>nextRegionForUnload</b> contains the index in the list
504    * of regions on the server that is the next to be shed.
505    */
506   private static class BalanceInfo {
507 
508     private final int nextRegionForUnload;
509     private int numRegionsAdded;
510 
511     public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
512       this.nextRegionForUnload = nextRegionForUnload;
513       this.numRegionsAdded = numRegionsAdded;
514     }
515 
516     public int getNextRegionForUnload() {
517       return nextRegionForUnload;
518     }
519 
520     public int getNumRegionsAdded() {
521       return numRegionsAdded;
522     }
523 
524     public void setNumRegionsAdded(int numAdded) {
525       this.numRegionsAdded = numAdded;
526     }
527   }
528 
529   /**
530    * Generates a bulk assignment plan to be used on cluster startup using a
531    * simple round-robin assignment.
532    * <p>
533    * Takes a list of all the regions and all the servers in the cluster and
534    * returns a map of each server to the regions that it should be assigned.
535    * <p>
536    * Currently implemented as a round-robin assignment.  Same invariant as
537    * load balancing, all servers holding floor(avg) or ceiling(avg).
538    *
539    * TODO: Use block locations from HDFS to place regions with their blocks
540    *
541    * @param regions all regions
542    * @param servers all servers
543    * @return map of server to the regions it should take, or null if no
544    *         assignment is possible (ie. no regions or no servers)
545    */
546   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
547       List<HRegionInfo> regions, List<ServerName> servers) {
548     if (regions.isEmpty() || servers.isEmpty()) {
549       return null;
550     }
551     Map<ServerName, List<HRegionInfo>> assignments =
552       new TreeMap<ServerName,List<HRegionInfo>>();
553     int numRegions = regions.size();
554     int numServers = servers.size();
555     int max = (int)Math.ceil((float)numRegions/numServers);
556     int serverIdx = 0;
557     if (numServers > 1) {
558       serverIdx = RANDOM.nextInt(numServers);
559     }
560     int regionIdx = 0;
561     for (int j = 0; j < numServers; j++) {
562       ServerName server = servers.get((j + serverIdx) % numServers);
563       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
564       for (int i=regionIdx; i<numRegions; i += numServers) {
565         serverRegions.add(regions.get(i % numRegions));
566       }
567       assignments.put(server, serverRegions);
568       regionIdx++;
569     }
570     return assignments;
571   }
572 
573   /**
574    * Generates a bulk assignment startup plan, attempting to reuse the existing
575    * assignment information from META, but adjusting for the specified list of
576    * available/online servers available for assignment.
577    * <p>
578    * Takes a map of all regions to their existing assignment from META.  Also
579    * takes a list of online servers for regions to be assigned to.  Attempts to
580    * retain all assignment, so in some instances initial assignment will not be
581    * completely balanced.
582    * <p>
583    * Any leftover regions without an existing server to be assigned to will be
584    * assigned randomly to available servers.
585    * @param regions regions and existing assignment from meta
586    * @param servers available servers
587    * @return map of servers and regions to be assigned to them
588    */
589   public Map<ServerName, List<HRegionInfo>> retainAssignment(
590       Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
591     // Group all of the old assignments by their hostname.
592     // We can't group directly by ServerName since the servers all have
593     // new start-codes.
594     
595     // Group the servers by their hostname. It's possible we have multiple
596     // servers on the same host on different ports.
597     ArrayListMultimap<String, ServerName> serversByHostname =
598         ArrayListMultimap.create();
599     for (ServerName server : servers) {
600       serversByHostname.put(server.getHostname(), server);
601     }
602     
603     // Now come up with new assignments
604     Map<ServerName, List<HRegionInfo>> assignments =
605       new TreeMap<ServerName, List<HRegionInfo>>();
606     
607     for (ServerName server : servers) {
608       assignments.put(server, new ArrayList<HRegionInfo>());
609     }
610     
611     // Collection of the hostnames that used to have regions
612     // assigned, but for which we no longer have any RS running
613     // after the cluster restart.
614     Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
615     
616     int numRandomAssignments = 0;
617     int numRetainedAssigments = 0;
618     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
619       HRegionInfo region = entry.getKey();
620       ServerName oldServerName = entry.getValue();
621       List<ServerName> localServers = new ArrayList<ServerName>();
622       if (oldServerName != null) {
623         localServers = serversByHostname.get(oldServerName.getHostname());
624       }
625       if (localServers.isEmpty()) {
626         // No servers on the new cluster match up with this hostname,
627         // assign randomly.
628         ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
629         assignments.get(randomServer).add(region);
630         numRandomAssignments++;
631         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
632       } else if (localServers.size() == 1) {
633         // the usual case - one new server on same host
634         assignments.get(localServers.get(0)).add(region);
635         numRetainedAssigments++;
636       } else {
637         // multiple new servers in the cluster on this same host
638         int size = localServers.size();
639         ServerName target = localServers.get(RANDOM.nextInt(size));
640         assignments.get(target).add(region);
641         numRetainedAssigments++;
642       }
643     }
644     
645     String randomAssignMsg = "";
646     if (numRandomAssignments > 0) {
647       randomAssignMsg = numRandomAssignments + " regions were assigned " +
648       		"to random hosts, since the old hosts for these regions are no " +
649       		"longer present in the cluster. These hosts were:\n  " +
650           Joiner.on("\n  ").join(oldHostsNoLongerPresent);
651     }
652     
653     LOG.info("Reassigned " + regions.size() + " regions. " +
654         numRetainedAssigments + " retained the pre-restart assignment. " +
655         randomAssignMsg);
656     return assignments;
657   }
658 
659   /**
660    * Returns an ordered list of hosts that are hosting the blocks for this
661    * region.  The weight of each host is the sum of the block lengths of all
662    * files on that host, so the first host in the list is the server which
663    * holds the most bytes of the given region's HFiles.
664    *
665    * @param fs the filesystem
666    * @param region region
667    * @return ordered list of hosts holding blocks of the specified region
668    */
669   @SuppressWarnings("unused")
670   private List<ServerName> getTopBlockLocations(FileSystem fs,
671     HRegionInfo region) {
672     List<ServerName> topServerNames = null;
673     try {
674       HTableDescriptor tableDescriptor = getTableDescriptor(
675         region.getTableName());
676       if (tableDescriptor != null) {
677         HDFSBlocksDistribution blocksDistribution =
678           HRegion.computeHDFSBlocksDistribution(config, tableDescriptor,
679           region.getEncodedName());
680         List<String> topHosts = blocksDistribution.getTopHosts();
681         topServerNames = mapHostNameToServerName(topHosts);
682       }
683     } catch (IOException ioe) {
684       LOG.debug("IOException during HDFSBlocksDistribution computation. for " +
685         "region = " + region.getEncodedName() , ioe);
686     }
687     
688     return topServerNames;
689   }
690 
691   /**
692    * return HTableDescriptor for a given tableName
693    * @param tableName the table name
694    * @return HTableDescriptor
695    * @throws IOException
696    */
697   private HTableDescriptor getTableDescriptor(byte[] tableName)
698     throws IOException {
699     HTableDescriptor tableDescriptor = null;
700     try {
701       if ( this.services != null)
702       {
703         tableDescriptor = this.services.getTableDescriptors().
704           get(Bytes.toString(tableName));
705       }
706     } catch (FileNotFoundException fnfe) {
707       LOG.debug("FileNotFoundException during getTableDescriptors." +
708         " Current table name = " + tableName , fnfe);
709     }
710 
711     return tableDescriptor;
712   }
713 
714   /**
715    * Map hostname to ServerName, The output ServerName list will have the same
716    * order as input hosts.
717    * @param hosts the list of hosts
718    * @return ServerName list
719    */  
720   private List<ServerName> mapHostNameToServerName(List<String> hosts) {
721     if ( hosts == null || status == null) {
722       return null;
723     }
724 
725     List<ServerName> topServerNames = new ArrayList<ServerName>();
726     Collection<ServerName> regionServers = status.getServers();
727 
728     // create a mapping from hostname to ServerName for fast lookup
729     HashMap<String, ServerName> hostToServerName =
730       new HashMap<String, ServerName>();
731     for (ServerName sn : regionServers) {
732       hostToServerName.put(sn.getHostname(), sn);
733         }
734 
735     for (String host : hosts ) {
736       ServerName sn = hostToServerName.get(host);
737       // it is possible that HDFS is up ( thus host is valid ),
738       // but RS is down ( thus sn is null )
739       if (sn != null) {
740         topServerNames.add(sn);
741       }
742     }
743     return topServerNames;
744   }
745 
746 
747   /**
748    * Generates an immediate assignment plan to be used by a new master for
749    * regions in transition that do not have an already known destination.
750    *
751    * Takes a list of regions that need immediate assignment and a list of
752    * all available servers.  Returns a map of regions to the server they
753    * should be assigned to.
754    *
755    * This method will return quickly and does not do any intelligent
756    * balancing.  The goal is to make a fast decision not the best decision
757    * possible.
758    *
759    * Currently this is random.
760    *
761    * @param regions
762    * @param servers
763    * @return map of regions to the server it should be assigned to
764    */
765   public Map<HRegionInfo, ServerName> immediateAssignment(
766       List<HRegionInfo> regions, List<ServerName> servers) {
767     Map<HRegionInfo,ServerName> assignments =
768       new TreeMap<HRegionInfo,ServerName>();
769     for(HRegionInfo region : regions) {
770       assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
771     }
772     return assignments;
773   }
774 
775   public ServerName randomAssignment(List<ServerName> servers) {
776     if (servers == null || servers.isEmpty()) {
777       LOG.warn("Wanted to do random assignment but no servers to assign to");
778       return null;
779     }
780     return servers.get(RANDOM.nextInt(servers.size()));
781   }
782 
783 }