1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Comparator;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.TreeMap;
30  import java.util.NavigableMap;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.RegionLoad;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.master.AssignmentManager;
40  import org.apache.hadoop.hbase.master.LoadBalancer;
41  import org.apache.hadoop.hbase.master.MasterServices;
42  import org.apache.hadoop.hbase.util.Bytes;
43  
44  import com.google.common.base.Joiner;
45  import com.google.common.collect.ArrayListMultimap;
46  import com.google.common.collect.Sets;
47  
48  /**
49   * The base class for load balancers. It provides the the functions used to by
50   * {@link AssignmentManager} to assign regions in the edge cases. It doesn't
51   * provide an implementation of the actual balancing algorithm.
52   *
53   */
54  public abstract class BaseLoadBalancer implements LoadBalancer {
55  
56    /**
57     * An efficient array based implementation similar to ClusterState for keeping
58     * the status of the cluster in terms of region assignment and distribution.
59     * To be used by LoadBalancers.
60     */
61    protected static class Cluster {
62      ServerName[] servers;
63      ArrayList<String> tables;
64      HRegionInfo[] regions;
65      List<RegionLoad>[] regionLoads;
66      int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
67  
68      int[][] regionsPerServer;            //serverIndex -> region list
69      int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
70      int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
71      int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
72      int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
73      int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
74  
75      Integer[] serverIndicesSortedByRegionCount;
76  
77      Map<String, Integer> serversToIndex;
78      Map<String, Integer> tablesToIndex;
79  
80      int numRegions;
81      int numServers;
82      int numTables;
83  
84      int numMovedRegions = 0; //num moved regions from the initial configuration
85      int numMovedMetaRegions = 0;       //num of moved regions that are META
86  
87      protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState,  Map<String, List<RegionLoad>> loads,
88          RegionLocationFinder regionFinder) {
89  
90        serversToIndex = new HashMap<String, Integer>();
91        tablesToIndex = new HashMap<String, Integer>();
92        //regionsToIndex = new HashMap<HRegionInfo, Integer>();
93  
94        //TODO: We should get the list of tables from master
95        tables = new ArrayList<String>();
96  
97  
98        numRegions = 0;
99  
100       int serverIndex = 0;
101 
102       // Use servername and port as there can be dead servers in this list. We want everything with
103       // a matching hostname and port to have the same index.
104       for (ServerName sn:clusterState.keySet()) {
105         if (serversToIndex.get(sn.getHostAndPort()) == null) {
106           serversToIndex.put(sn.getHostAndPort(), serverIndex++);
107         }
108       }
109 
110       // Count how many regions there are.
111       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
112         numRegions += entry.getValue().size();
113       }
114 
115       numServers = serversToIndex.size();
116       regionsPerServer = new int[serversToIndex.size()][];
117 
118       servers = new ServerName[numServers];
119       regions = new HRegionInfo[numRegions];
120       regionIndexToServerIndex = new int[numRegions];
121       initialRegionIndexToServerIndex = new int[numRegions];
122       regionIndexToTableIndex = new int[numRegions];
123       regionLoads = new List[numRegions];
124       regionLocations = new int[numRegions][];
125       serverIndicesSortedByRegionCount = new Integer[numServers];
126 
127       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
128 
129       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
130         serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
131 
132         // keep the servername if this is the first server name for this hostname
133         // or this servername has the newest startcode.
134         if (servers[serverIndex] == null ||
135             servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
136           servers[serverIndex] = entry.getKey();
137         }
138 
139         regionsPerServer[serverIndex] = new int[entry.getValue().size()];
140         serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
141       }
142 
143       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
144         serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
145         regionPerServerIndex = 0;
146 
147         for (HRegionInfo region : entry.getValue()) {
148           String tableName = region.getTableNameAsString();
149           Integer idx = tablesToIndex.get(tableName);
150           if (idx == null) {
151             tables.add(tableName);
152             idx = tableIndex;
153             tablesToIndex.put(tableName, tableIndex++);
154           }
155 
156           regions[regionIndex] = region;
157           regionIndexToServerIndex[regionIndex] = serverIndex;
158           initialRegionIndexToServerIndex[regionIndex] = serverIndex;
159           regionIndexToTableIndex[regionIndex] = idx;
160           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
161 
162           // region load
163           if (loads != null) {
164             List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
165             // That could have failed if the RegionLoad is using the other regionName
166             if (rl == null) {
167               // Try getting the region load using encoded name.
168               rl = loads.get(region.getEncodedName());
169             }
170             regionLoads[regionIndex] = rl;
171           }
172 
173           if (regionFinder != null) {
174             //region location
175             List<ServerName> loc = regionFinder.getTopBlockLocations(region);
176             regionLocations[regionIndex] = new int[loc.size()];
177             for (int i=0; i < loc.size(); i++) {
178               regionLocations[regionIndex][i] =
179                   loc.get(i) == null ? -1 :
180                     (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
181             }
182           }
183 
184           regionIndex++;
185         }
186       }
187 
188       numTables = tables.size();
189       numRegionsPerServerPerTable = new int[numServers][numTables];
190 
191       for (int i = 0; i < numServers; i++) {
192         for (int j = 0; j < numTables; j++) {
193           numRegionsPerServerPerTable[i][j] = 0;
194         }
195       }
196 
197       for (int i=0; i < regionIndexToServerIndex.length; i++) {
198         numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
199       }
200 
201       numMaxRegionsPerTable = new int[numTables];
202       for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
203         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
204           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
205             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
206           }
207         }
208       }
209     }
210 
211     public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
212       //swap
213       if (rRegion >= 0 && lRegion >= 0) {
214         regionMoved(rRegion, rServer, lServer);
215         regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
216         regionMoved(lRegion, lServer, rServer);
217         regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
218       } else if (rRegion >= 0) { //move rRegion
219         regionMoved(rRegion, rServer, lServer);
220         regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
221         regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
222       } else if (lRegion >= 0) { //move lRegion
223         regionMoved(lRegion, lServer, rServer);
224         regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
225         regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
226       }
227     }
228 
229     /** Region moved out of the server */
230     void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
231       regionIndexToServerIndex[regionIndex] = newServerIndex;
232       if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
233         numMovedRegions--; //region moved back to original location
234         if (regions[regionIndex].isMetaRegion()) {
235           numMovedMetaRegions--;
236         }
237       } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
238         numMovedRegions++; //region moved from original location
239         if (regions[regionIndex].isMetaRegion()) {
240           numMovedMetaRegions++;
241         }
242       }
243       int tableIndex = regionIndexToTableIndex[regionIndex];
244       numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
245       numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
246 
247       //check whether this caused maxRegionsPerTable in the new Server to be updated
248       if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
249         numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
250       } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
251           == numMaxRegionsPerTable[tableIndex]) {
252         //recompute maxRegionsPerTable since the previous value was coming from the old server
253         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
254           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
255             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
256           }
257         }
258       }
259     }
260 
261     int[] removeRegion(int[] regions, int regionIndex) {
262       //TODO: this maybe costly. Consider using linked lists
263       int[] newRegions = new int[regions.length - 1];
264       int i = 0;
265       for (i = 0; i < regions.length; i++) {
266         if (regions[i] == regionIndex) {
267           break;
268         }
269         newRegions[i] = regions[i];
270       }
271       System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
272       return newRegions;
273     }
274 
275     int[] addRegion(int[] regions, int regionIndex) {
276       int[] newRegions = new int[regions.length + 1];
277       System.arraycopy(regions, 0, newRegions, 0, regions.length);
278       newRegions[newRegions.length - 1] = regionIndex;
279       return newRegions;
280     }
281 
282     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
283       int i = 0;
284       for (i = 0; i < regions.length; i++) {
285         if (regions[i] == regionIndex) {
286           regions[i] = newRegionIndex;
287           break;
288         }
289       }
290       return regions;
291     }
292 
293     void sortServersByRegionCount() {
294       Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
295     }
296 
297     int getNumRegions(int server) {
298       return regionsPerServer[server].length;
299     }
300 
301     private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
302       @Override
303       public int compare(Integer integer, Integer integer2) {
304         return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
305       }
306     };
307 
308     @Override
309     public String toString() {
310       String desc = "Cluster{" +
311           "servers=[";
312           for(ServerName sn:servers) {
313              desc += sn.getHostAndPort() + ", ";
314           }
315           desc +=
316           ", serverIndicesSortedByRegionCount="+
317           Arrays.toString(serverIndicesSortedByRegionCount) +
318           ", regionsPerServer=[";
319 
320           for (int[]r:regionsPerServer) {
321             desc += Arrays.toString(r);
322           }
323           desc += "]" +
324           ", numMaxRegionsPerTable=" +
325           Arrays.toString(numMaxRegionsPerTable) +
326           ", numRegions=" +
327           numRegions +
328           ", numServers=" +
329           numServers +
330           ", numTables=" +
331           numTables +
332           ", numMovedRegions=" +
333           numMovedRegions +
334           ", numMovedMetaRegions=" +
335           numMovedMetaRegions +
336           '}';
337       return desc;
338     }
339   }
340 
341   // slop for regions
342   private float slop;
343   private Configuration config;
344   private static final Random RANDOM = new Random(System.currentTimeMillis());
345   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
346   protected MasterServices services;
347 
348   @Override
349   public void setConf(Configuration conf) {
350     this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
351     if (slop < 0) slop = 0;
352     else if (slop > 1) slop = 1;
353     this.config = conf;
354   }
355 
356   @Override
357   public Configuration getConf() {
358     return this.config;
359   }
360 
361   public void setClusterStatus(ClusterStatus st) {
362     // Not used except for the StocasticBalancer
363   }
364 
365   public void setMasterServices(MasterServices masterServices) {
366     this.services = masterServices;
367   }
368 
369   protected boolean needsBalance(ClusterLoadState cs) {
370     if (cs.getNumServers() == 0) {
371       LOG.debug("numServers=0 so skipping load balancing");
372       return false;
373     }
374     // Check if we even need to do any load balancing
375     // HBASE-3681 check sloppiness first
376     float average = cs.getLoadAverage(); // for logging
377     int floor = (int) Math.floor(average * (1 - slop));
378     int ceiling = (int) Math.ceil(average * (1 + slop));
379     if (!(cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor)) {
380       NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
381       if (LOG.isTraceEnabled()) {
382         // If nothing to balance, then don't say anything unless trace-level logging.
383         LOG.trace("Skipping load balancing because balanced cluster; " +
384           "servers=" + cs.getNumServers() + " " +
385           "regions=" + cs.getNumRegions() + " average=" + average + " " +
386           "mostloaded=" + serversByLoad.lastKey().getLoad() +
387           " leastloaded=" + serversByLoad.firstKey().getLoad());
388       }
389       return false;
390     }
391     return true;
392   }
393 
394   /**
395    * Generates a bulk assignment plan to be used on cluster startup using a
396    * simple round-robin assignment.
397    * <p>
398    * Takes a list of all the regions and all the servers in the cluster and
399    * returns a map of each server to the regions that it should be assigned.
400    * <p>
401    * Currently implemented as a round-robin assignment. Same invariant as load
402    * balancing, all servers holding floor(avg) or ceiling(avg).
403    *
404    * TODO: Use block locations from HDFS to place regions with their blocks
405    *
406    * @param regions all regions
407    * @param servers all servers
408    * @return map of server to the regions it should take, or null if no
409    *         assignment is possible (ie. no regions or no servers)
410    */
411   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
412       List<ServerName> servers) {
413     if (regions.isEmpty() || servers.isEmpty()) {
414       return null;
415     }
416     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
417     int numRegions = regions.size();
418     int numServers = servers.size();
419     int max = (int) Math.ceil((float) numRegions / numServers);
420     int serverIdx = 0;
421     if (numServers > 1) {
422       serverIdx = RANDOM.nextInt(numServers);
423     }
424     int regionIdx = 0;
425     for (int j = 0; j < numServers; j++) {
426       ServerName server = servers.get((j + serverIdx) % numServers);
427       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
428       for (int i = regionIdx; i < numRegions; i += numServers) {
429         serverRegions.add(regions.get(i % numRegions));
430       }
431       assignments.put(server, serverRegions);
432       regionIdx++;
433     }
434     return assignments;
435   }
436 
437   /**
438    * Generates an immediate assignment plan to be used by a new master for
439    * regions in transition that do not have an already known destination.
440    *
441    * Takes a list of regions that need immediate assignment and a list of all
442    * available servers. Returns a map of regions to the server they should be
443    * assigned to.
444    *
445    * This method will return quickly and does not do any intelligent balancing.
446    * The goal is to make a fast decision not the best decision possible.
447    *
448    * Currently this is random.
449    *
450    * @param regions
451    * @param servers
452    * @return map of regions to the server it should be assigned to
453    */
454   public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
455       List<ServerName> servers) {
456     Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
457     for (HRegionInfo region : regions) {
458       assignments.put(region, randomAssignment(region, servers));
459     }
460     return assignments;
461   }
462 
463   /**
464    * Used to assign a single region to a random server.
465    */
466   public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
467     if (servers == null || servers.isEmpty()) {
468       LOG.warn("Wanted to do random assignment but no servers to assign to");
469       return null;
470     }
471     return servers.get(RANDOM.nextInt(servers.size()));
472   }
473 
474   /**
475    * Generates a bulk assignment startup plan, attempting to reuse the existing
476    * assignment information from META, but adjusting for the specified list of
477    * available/online servers available for assignment.
478    * <p>
479    * Takes a map of all regions to their existing assignment from META. Also
480    * takes a list of online servers for regions to be assigned to. Attempts to
481    * retain all assignment, so in some instances initial assignment will not be
482    * completely balanced.
483    * <p>
484    * Any leftover regions without an existing server to be assigned to will be
485    * assigned randomly to available servers.
486    *
487    * @param regions regions and existing assignment from meta
488    * @param servers available servers
489    * @return map of servers and regions to be assigned to them
490    */
491   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
492       List<ServerName> servers) {
493     // Group all of the old assignments by their hostname.
494     // We can't group directly by ServerName since the servers all have
495     // new start-codes.
496 
497     // Group the servers by their hostname. It's possible we have multiple
498     // servers on the same host on different ports.
499     ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
500     for (ServerName server : servers) {
501       serversByHostname.put(server.getHostname(), server);
502     }
503 
504     // Now come up with new assignments
505     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
506 
507     for (ServerName server : servers) {
508       assignments.put(server, new ArrayList<HRegionInfo>());
509     }
510 
511     // Collection of the hostnames that used to have regions
512     // assigned, but for which we no longer have any RS running
513     // after the cluster restart.
514     Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
515 
516     int numRandomAssignments = 0;
517     int numRetainedAssigments = 0;
518     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
519       HRegionInfo region = entry.getKey();
520       ServerName oldServerName = entry.getValue();
521       List<ServerName> localServers = new ArrayList<ServerName>();
522       if (oldServerName != null) {
523         localServers = serversByHostname.get(oldServerName.getHostname());
524       }
525       if (localServers.isEmpty()) {
526         // No servers on the new cluster match up with this hostname,
527         // assign randomly.
528         ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
529         assignments.get(randomServer).add(region);
530         numRandomAssignments++;
531         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
532       } else if (localServers.size() == 1) {
533         // the usual case - one new server on same host
534         assignments.get(localServers.get(0)).add(region);
535         numRetainedAssigments++;
536       } else {
537         // multiple new servers in the cluster on this same host
538         int size = localServers.size();
539         ServerName target = localServers.get(RANDOM.nextInt(size));
540         assignments.get(target).add(region);
541         numRetainedAssigments++;
542       }
543     }
544 
545     String randomAssignMsg = "";
546     if (numRandomAssignments > 0) {
547       randomAssignMsg =
548           numRandomAssignments + " regions were assigned "
549               + "to random hosts, since the old hosts for these regions are no "
550               + "longer present in the cluster. These hosts were:\n  "
551               + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
552     }
553 
554     LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
555         + " retained the pre-restart assignment. " + randomAssignMsg);
556     return assignments;
557   }
558 
559 }