1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.HashMap;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Map.Entry;
25  import java.util.Random;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.NavigableMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.ClusterStatus;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.RegionLoad;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.master.AssignmentManager;
38  import org.apache.hadoop.hbase.master.LoadBalancer;
39  import org.apache.hadoop.hbase.master.MasterServices;
40  import org.apache.hadoop.hbase.util.Bytes;
41  
42  import com.google.common.base.Joiner;
43  import com.google.common.collect.ArrayListMultimap;
44  import com.google.common.collect.Sets;
45  
46  /**
47   * The base class for load balancers. It provides the the functions used to by
48   * {@link AssignmentManager} to assign regions in the edge cases. It doesn't
49   * provide an implementation of the actual balancing algorithm.
50   *
51   */
52  public abstract class BaseLoadBalancer implements LoadBalancer {
53  
54    /**
55     * An efficient array based implementation similar to ClusterState for keeping
56     * the status of the cluster in terms of region assignment and distribution.
57     * To be used by LoadBalancers.
58     */
59    protected static class Cluster {
60      ServerName[] servers;
61      ArrayList<byte[]> tables;
62      HRegionInfo[] regions;
63      List<RegionLoad>[] regionLoads;
64      int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
65  
66      int[][] regionsPerServer;            //serverIndex -> region list
67      int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
68      int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
69      int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
70      int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
71      int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
72  
73      Map<ServerName, Integer> serversToIndex;
74      Map<Integer, Integer> tablesToIndex;
75  
76      int numRegions;
77      int numServers;
78      int numTables;
79  
80      int numMovedRegions = 0; //num moved regions from the initial configuration
81      int numMovedMetaRegions = 0;       //num of moved regions that are META
82  
83      protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState,  Map<String, List<RegionLoad>> loads,
84          RegionLocationFinder regionFinder) {
85        serversToIndex = new HashMap<ServerName, Integer>(clusterState.size());
86        tablesToIndex = new HashMap<Integer, Integer>();
87        //regionsToIndex = new HashMap<HRegionInfo, Integer>();
88  
89        //TODO: We should get the list of tables from master
90        tables = new ArrayList<byte[]>();
91  
92        numServers = clusterState.size();
93        numRegions = 0;
94  
95        for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
96          numRegions += entry.getValue().size();
97        }
98  
99        regionsPerServer = new int[clusterState.size()][];
100       servers = new ServerName[numServers];
101       regions = new HRegionInfo[numRegions];
102       regionIndexToServerIndex = new int[numRegions];
103       initialRegionIndexToServerIndex = new int[numRegions];
104       regionIndexToTableIndex = new int[numRegions];
105       regionLoads = new List[numRegions];
106       regionLocations = new int[numRegions][];
107 
108       int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
109       // populate serversToIndex first
110       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
111         servers[serverIndex] = entry.getKey();
112         regionsPerServer[serverIndex] = new int[entry.getValue().size()];
113         serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex));
114         serverIndex++;
115       }
116       serverIndex = 0;
117       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
118         regionPerServerIndex = 0;
119         for (HRegionInfo region : entry.getValue()) {
120           byte[] tableName = region.getTableName();
121           int tableHash = Bytes.mapKey(tableName);
122           Integer idx = tablesToIndex.get(tableHash);
123           if (idx == null) {
124             tables.add(tableName);
125             idx = tableIndex;
126             tablesToIndex.put(tableHash, tableIndex++);
127           }
128 
129           regions[regionIndex] = region;
130           regionIndexToServerIndex[regionIndex] = serverIndex;
131           initialRegionIndexToServerIndex[regionIndex] = serverIndex;
132           regionIndexToTableIndex[regionIndex] = idx;
133           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
134 
135           //region load
136           if (loads != null) {
137             List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
138             // That could have failed if the RegionLoad is using the other regionName
139             if (rl == null) {
140               // Try getting the region load using encoded name.
141               rl = loads.get(region.getEncodedName());
142             }
143             regionLoads[regionIndex] = rl;
144           }
145 
146           if (regionFinder != null) {
147             //region location
148             List<ServerName> loc = regionFinder.getTopBlockLocations(region);
149             regionLocations[regionIndex] = new int[loc.size()];
150             for (int i=0; i < loc.size(); i++) {
151               regionLocations[regionIndex][i] =
152                   loc.get(i) == null ? -1 :
153                     (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i)));
154             }
155           }
156 
157           regionIndex++;
158         }
159         serverIndex++;
160       }
161 
162       numTables = tables.size();
163       numRegionsPerServerPerTable = new int[numServers][numTables];
164 
165       for (int i = 0; i < numServers; i++) {
166         for (int j = 0; j < numTables; j++) {
167           numRegionsPerServerPerTable[i][j] = 0;
168         }
169       }
170 
171       for (int i=0; i < regionIndexToServerIndex.length; i++) {
172         numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
173       }
174 
175       numMaxRegionsPerTable = new int[numTables];
176       for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
177         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
178           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
179             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
180           }
181         }
182       }
183     }
184 
185     public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
186       //swap
187       if (rRegion >= 0 && lRegion >= 0) {
188         regionMoved(rRegion, rServer, lServer);
189         regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
190         regionMoved(lRegion, lServer, rServer);
191         regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
192       } else if (rRegion >= 0) { //move rRegion
193         regionMoved(rRegion, rServer, lServer);
194         regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
195         regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
196       } else if (lRegion >= 0) { //move lRegion
197         regionMoved(lRegion, lServer, rServer);
198         regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
199         regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
200       }
201     }
202 
203     /** Region moved out of the server */
204     void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
205       regionIndexToServerIndex[regionIndex] = newServerIndex;
206       if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
207         numMovedRegions--; //region moved back to original location
208         if (regions[regionIndex].isMetaRegion()) {
209           numMovedMetaRegions--;
210         }
211       } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
212         numMovedRegions++; //region moved from original location
213         if (regions[regionIndex].isMetaRegion()) {
214           numMovedMetaRegions++;
215         }
216       }
217       int tableIndex = regionIndexToTableIndex[regionIndex];
218       numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
219       numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
220 
221       //check whether this caused maxRegionsPerTable in the new Server to be updated
222       if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
223         numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
224       } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
225           == numMaxRegionsPerTable[tableIndex]) {
226         //recompute maxRegionsPerTable since the previous value was coming from the old server
227         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
228           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
229             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
230           }
231         }
232       }
233     }
234 
235     int[] removeRegion(int[] regions, int regionIndex) {
236       //TODO: this maybe costly. Consider using linked lists
237       int[] newRegions = new int[regions.length - 1];
238       int i = 0;
239       for (i = 0; i < regions.length; i++) {
240         if (regions[i] == regionIndex) {
241           break;
242         }
243         newRegions[i] = regions[i];
244       }
245       System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
246       return newRegions;
247     }
248 
249     int[] addRegion(int[] regions, int regionIndex) {
250       int[] newRegions = new int[regions.length + 1];
251       System.arraycopy(regions, 0, newRegions, 0, regions.length);
252       newRegions[newRegions.length - 1] = regionIndex;
253       return newRegions;
254     }
255 
256     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
257       int i = 0;
258       for (i = 0; i < regions.length; i++) {
259         if (regions[i] == regionIndex) {
260           regions[i] = newRegionIndex;
261           break;
262         }
263       }
264       return regions;
265     }
266   }
267 
268   // slop for regions
269   private float slop;
270   private Configuration config;
271   private static final Random RANDOM = new Random(System.currentTimeMillis());
272   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
273 
274   protected MasterServices services;
275 
276   @Override
277   public void setConf(Configuration conf) {
278     this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
279     if (slop < 0) slop = 0;
280     else if (slop > 1) slop = 1;
281     this.config = conf;
282   }
283 
284   @Override
285   public Configuration getConf() {
286     return this.config;
287   }
288 
289   public void setClusterStatus(ClusterStatus st) {
290     // Not used except for the StocasticBalancer
291   }
292 
293   public void setMasterServices(MasterServices masterServices) {
294     this.services = masterServices;
295   }
296 
297   protected boolean needsBalance(ClusterLoadState cs) {
298     if (cs.getNumServers() == 0) {
299       LOG.debug("numServers=0 so skipping load balancing");
300       return false;
301     }
302     // Check if we even need to do any load balancing
303     // HBASE-3681 check sloppiness first
304     float average = cs.getLoadAverage(); // for logging
305     int floor = (int) Math.floor(average * (1 - slop));
306     int ceiling = (int) Math.ceil(average * (1 + slop));
307     if (!(cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor)) {
308       NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
309       LOG.info("Skipping load balancing because balanced cluster; " +
310           "servers=" + cs.getNumServers() + " " +
311           "regions=" + cs.getNumRegions() + " average=" + average + " " +
312           "mostloaded=" + serversByLoad.lastKey().getLoad() +
313           " leastloaded=" + serversByLoad.firstKey().getLoad());
314       return false;
315     }
316     return true;
317   }
318 
319   /**
320    * Generates a bulk assignment plan to be used on cluster startup using a
321    * simple round-robin assignment.
322    * <p>
323    * Takes a list of all the regions and all the servers in the cluster and
324    * returns a map of each server to the regions that it should be assigned.
325    * <p>
326    * Currently implemented as a round-robin assignment. Same invariant as load
327    * balancing, all servers holding floor(avg) or ceiling(avg).
328    *
329    * TODO: Use block locations from HDFS to place regions with their blocks
330    *
331    * @param regions all regions
332    * @param servers all servers
333    * @return map of server to the regions it should take, or null if no
334    *         assignment is possible (ie. no regions or no servers)
335    */
336   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
337       List<ServerName> servers) {
338     if (regions.isEmpty() || servers.isEmpty()) {
339       return null;
340     }
341     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
342     int numRegions = regions.size();
343     int numServers = servers.size();
344     int max = (int) Math.ceil((float) numRegions / numServers);
345     int serverIdx = 0;
346     if (numServers > 1) {
347       serverIdx = RANDOM.nextInt(numServers);
348     }
349     int regionIdx = 0;
350     for (int j = 0; j < numServers; j++) {
351       ServerName server = servers.get((j + serverIdx) % numServers);
352       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
353       for (int i = regionIdx; i < numRegions; i += numServers) {
354         serverRegions.add(regions.get(i % numRegions));
355       }
356       assignments.put(server, serverRegions);
357       regionIdx++;
358     }
359     return assignments;
360   }
361 
362   /**
363    * Generates an immediate assignment plan to be used by a new master for
364    * regions in transition that do not have an already known destination.
365    *
366    * Takes a list of regions that need immediate assignment and a list of all
367    * available servers. Returns a map of regions to the server they should be
368    * assigned to.
369    *
370    * This method will return quickly and does not do any intelligent balancing.
371    * The goal is to make a fast decision not the best decision possible.
372    *
373    * Currently this is random.
374    *
375    * @param regions
376    * @param servers
377    * @return map of regions to the server it should be assigned to
378    */
379   public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
380       List<ServerName> servers) {
381     Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
382     for (HRegionInfo region : regions) {
383       assignments.put(region, randomAssignment(region, servers));
384     }
385     return assignments;
386   }
387 
388   /**
389    * Used to assign a single region to a random server.
390    */
391   public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
392     if (servers == null || servers.isEmpty()) {
393       LOG.warn("Wanted to do random assignment but no servers to assign to");
394       return null;
395     }
396     return servers.get(RANDOM.nextInt(servers.size()));
397   }
398 
399   /**
400    * Generates a bulk assignment startup plan, attempting to reuse the existing
401    * assignment information from META, but adjusting for the specified list of
402    * available/online servers available for assignment.
403    * <p>
404    * Takes a map of all regions to their existing assignment from META. Also
405    * takes a list of online servers for regions to be assigned to. Attempts to
406    * retain all assignment, so in some instances initial assignment will not be
407    * completely balanced.
408    * <p>
409    * Any leftover regions without an existing server to be assigned to will be
410    * assigned randomly to available servers.
411    *
412    * @param regions regions and existing assignment from meta
413    * @param servers available servers
414    * @return map of servers and regions to be assigned to them
415    */
416   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
417       List<ServerName> servers) {
418     // Group all of the old assignments by their hostname.
419     // We can't group directly by ServerName since the servers all have
420     // new start-codes.
421 
422     // Group the servers by their hostname. It's possible we have multiple
423     // servers on the same host on different ports.
424     ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
425     for (ServerName server : servers) {
426       serversByHostname.put(server.getHostname(), server);
427     }
428 
429     // Now come up with new assignments
430     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
431 
432     for (ServerName server : servers) {
433       assignments.put(server, new ArrayList<HRegionInfo>());
434     }
435 
436     // Collection of the hostnames that used to have regions
437     // assigned, but for which we no longer have any RS running
438     // after the cluster restart.
439     Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
440 
441     int numRandomAssignments = 0;
442     int numRetainedAssigments = 0;
443     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
444       HRegionInfo region = entry.getKey();
445       ServerName oldServerName = entry.getValue();
446       List<ServerName> localServers = new ArrayList<ServerName>();
447       if (oldServerName != null) {
448         localServers = serversByHostname.get(oldServerName.getHostname());
449       }
450       if (localServers.isEmpty()) {
451         // No servers on the new cluster match up with this hostname,
452         // assign randomly.
453         ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
454         assignments.get(randomServer).add(region);
455         numRandomAssignments++;
456         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
457       } else if (localServers.size() == 1) {
458         // the usual case - one new server on same host
459         assignments.get(localServers.get(0)).add(region);
460         numRetainedAssigments++;
461       } else {
462         // multiple new servers in the cluster on this same host
463         int size = localServers.size();
464         ServerName target = localServers.get(RANDOM.nextInt(size));
465         assignments.get(target).add(region);
466         numRetainedAssigments++;
467       }
468     }
469 
470     String randomAssignMsg = "";
471     if (numRandomAssignments > 0) {
472       randomAssignMsg =
473           numRandomAssignments + " regions were assigned "
474               + "to random hosts, since the old hosts for these regions are no "
475               + "longer present in the cluster. These hosts were:\n  "
476               + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
477     }
478 
479     LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
480         + " retained the pre-restart assignment. " + randomAssignMsg);
481     return assignments;
482   }
483 
484 }