View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Comparator;
23  import java.util.Deque;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.NavigableMap;
29  import java.util.Random;
30  import java.util.Set;
31  import java.util.TreeMap;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.ClusterStatus;
37  import org.apache.hadoop.hbase.HBaseIOException;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLoad;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.master.AssignmentManager;
42  import org.apache.hadoop.hbase.master.LoadBalancer;
43  import org.apache.hadoop.hbase.master.MasterServices;
44  
45  import com.google.common.base.Joiner;
46  import com.google.common.collect.ArrayListMultimap;
47  import com.google.common.collect.Sets;
48  
49  /**
50   * The base class for load balancers. It provides the the functions used to by
51   * {@link AssignmentManager} to assign regions in the edge cases. It doesn't
52   * provide an implementation of the actual balancing algorithm.
53   *
54   */
55  public abstract class BaseLoadBalancer implements LoadBalancer {
56    private static final int MIN_SERVER_BALANCE = 2;
57    private volatile boolean stopped = false;
58  
59    /**
60     * An efficient array based implementation similar to ClusterState for keeping
61     * the status of the cluster in terms of region assignment and distribution.
62     * To be used by LoadBalancers.
63     */
64    protected static class Cluster {
65      ServerName[] servers;
66      ArrayList<String> tables;
67      HRegionInfo[] regions;
68      Deque<RegionLoad>[] regionLoads;
69      int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
70  
71      int[][] regionsPerServer;            //serverIndex -> region list
72      int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
73      int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
74      int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
75      int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
76      int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
77  
78      Integer[] serverIndicesSortedByRegionCount;
79  
80      Map<String, Integer> serversToIndex;
81      Map<String, Integer> tablesToIndex;
82  
83      int numRegions;
84      int numServers;
85      int numTables;
86  
87      int numMovedRegions = 0; //num moved regions from the initial configuration
88      int numMovedMetaRegions = 0;       //num of moved regions that are META
89  
90      protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState,  Map<String, Deque<RegionLoad>> loads,
91          RegionLocationFinder regionFinder) {
92  
93        serversToIndex = new HashMap<String, Integer>();
94        tablesToIndex = new HashMap<String, Integer>();
95        //regionsToIndex = new HashMap<HRegionInfo, Integer>();
96  
97        //TODO: We should get the list of tables from master
98        tables = new ArrayList<String>();
99  
100 
101       numRegions = 0;
102 
103       int serverIndex = 0;
104 
105       // Use servername and port as there can be dead servers in this list. We want everything with
106       // a matching hostname and port to have the same index.
107       for (ServerName sn:clusterState.keySet()) {
108         if (serversToIndex.get(sn.getHostAndPort()) == null) {
109           serversToIndex.put(sn.getHostAndPort(), serverIndex++);
110         }
111       }
112 
113       // Count how many regions there are.
114       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
115         numRegions += entry.getValue().size();
116       }
117 
118       numServers = serversToIndex.size();
119       regionsPerServer = new int[serversToIndex.size()][];
120 
121       servers = new ServerName[numServers];
122       regions = new HRegionInfo[numRegions];
123       regionIndexToServerIndex = new int[numRegions];
124       initialRegionIndexToServerIndex = new int[numRegions];
125       regionIndexToTableIndex = new int[numRegions];
126       regionLoads = new Deque[numRegions];
127       regionLocations = new int[numRegions][];
128       serverIndicesSortedByRegionCount = new Integer[numServers];
129 
130       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
131 
132       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
133         serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
134 
135         // keep the servername if this is the first server name for this hostname
136         // or this servername has the newest startcode.
137         if (servers[serverIndex] == null ||
138             servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
139           servers[serverIndex] = entry.getKey();
140         }
141 
142         if (regionsPerServer[serverIndex] != null) {
143           // there is another server with the same hostAndPort in ClusterState.
144           // allocate the array for the total size
145           regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
146         } else {
147           regionsPerServer[serverIndex] = new int[entry.getValue().size()];
148         }
149         serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
150       }
151 
152       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
153         serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
154         regionPerServerIndex = 0;
155 
156         for (HRegionInfo region : entry.getValue()) {
157           String tableName = region.getTable().getNameAsString();
158           Integer idx = tablesToIndex.get(tableName);
159           if (idx == null) {
160             tables.add(tableName);
161             idx = tableIndex;
162             tablesToIndex.put(tableName, tableIndex++);
163           }
164 
165           regions[regionIndex] = region;
166           regionIndexToServerIndex[regionIndex] = serverIndex;
167           initialRegionIndexToServerIndex[regionIndex] = serverIndex;
168           regionIndexToTableIndex[regionIndex] = idx;
169           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
170 
171           // region load
172           if (loads != null) {
173             Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
174             // That could have failed if the RegionLoad is using the other regionName
175             if (rl == null) {
176               // Try getting the region load using encoded name.
177               rl = loads.get(region.getEncodedName());
178             }
179             regionLoads[regionIndex] = rl;
180           }
181 
182           if (regionFinder != null) {
183             //region location
184             List<ServerName> loc = regionFinder.getTopBlockLocations(region);
185             regionLocations[regionIndex] = new int[loc.size()];
186             for (int i=0; i < loc.size(); i++) {
187               regionLocations[regionIndex][i] =
188                   loc.get(i) == null ? -1 :
189                     (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort()));
190             }
191           }
192 
193           regionIndex++;
194         }
195       }
196 
197       numTables = tables.size();
198       numRegionsPerServerPerTable = new int[numServers][numTables];
199 
200       for (int i = 0; i < numServers; i++) {
201         for (int j = 0; j < numTables; j++) {
202           numRegionsPerServerPerTable[i][j] = 0;
203         }
204       }
205 
206       for (int i=0; i < regionIndexToServerIndex.length; i++) {
207         numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
208       }
209 
210       numMaxRegionsPerTable = new int[numTables];
211       for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
212         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
213           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
214             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
215           }
216         }
217       }
218     }
219 
220     public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
221       //swap
222       if (rRegion >= 0 && lRegion >= 0) {
223         regionMoved(rRegion, rServer, lServer);
224         regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
225         regionMoved(lRegion, lServer, rServer);
226         regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
227       } else if (rRegion >= 0) { //move rRegion
228         regionMoved(rRegion, rServer, lServer);
229         regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
230         regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
231       } else if (lRegion >= 0) { //move lRegion
232         regionMoved(lRegion, lServer, rServer);
233         regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
234         regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
235       }
236     }
237 
238     /** Region moved out of the server */
239     void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
240       regionIndexToServerIndex[regionIndex] = newServerIndex;
241       if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
242         numMovedRegions--; //region moved back to original location
243         if (regions[regionIndex].isMetaRegion()) {
244           numMovedMetaRegions--;
245         }
246       } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
247         numMovedRegions++; //region moved from original location
248         if (regions[regionIndex].isMetaRegion()) {
249           numMovedMetaRegions++;
250         }
251       }
252       int tableIndex = regionIndexToTableIndex[regionIndex];
253       numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
254       numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
255 
256       //check whether this caused maxRegionsPerTable in the new Server to be updated
257       if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
258         numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
259       } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
260           == numMaxRegionsPerTable[tableIndex]) {
261         //recompute maxRegionsPerTable since the previous value was coming from the old server
262         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
263           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
264             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
265           }
266         }
267       }
268     }
269 
270     int[] removeRegion(int[] regions, int regionIndex) {
271       //TODO: this maybe costly. Consider using linked lists
272       int[] newRegions = new int[regions.length - 1];
273       int i = 0;
274       for (i = 0; i < regions.length; i++) {
275         if (regions[i] == regionIndex) {
276           break;
277         }
278         newRegions[i] = regions[i];
279       }
280       System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
281       return newRegions;
282     }
283 
284     int[] addRegion(int[] regions, int regionIndex) {
285       int[] newRegions = new int[regions.length + 1];
286       System.arraycopy(regions, 0, newRegions, 0, regions.length);
287       newRegions[newRegions.length - 1] = regionIndex;
288       return newRegions;
289     }
290 
291     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
292       int i = 0;
293       for (i = 0; i < regions.length; i++) {
294         if (regions[i] == regionIndex) {
295           regions[i] = newRegionIndex;
296           break;
297         }
298       }
299       return regions;
300     }
301 
302     void sortServersByRegionCount() {
303       Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
304     }
305 
306     int getNumRegions(int server) {
307       return regionsPerServer[server].length;
308     }
309 
310     private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
311       @Override
312       public int compare(Integer integer, Integer integer2) {
313         return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
314       }
315     };
316 
317     @Override
318     public String toString() {
319       String desc = "Cluster{" +
320           "servers=[";
321           for(ServerName sn:servers) {
322              desc += sn.getHostAndPort() + ", ";
323           }
324           desc +=
325           ", serverIndicesSortedByRegionCount="+
326           Arrays.toString(serverIndicesSortedByRegionCount) +
327           ", regionsPerServer=[";
328 
329           for (int[]r:regionsPerServer) {
330             desc += Arrays.toString(r);
331           }
332           desc += "]" +
333           ", numMaxRegionsPerTable=" +
334           Arrays.toString(numMaxRegionsPerTable) +
335           ", numRegions=" +
336           numRegions +
337           ", numServers=" +
338           numServers +
339           ", numTables=" +
340           numTables +
341           ", numMovedRegions=" +
342           numMovedRegions +
343           ", numMovedMetaRegions=" +
344           numMovedMetaRegions +
345           '}';
346       return desc;
347     }
348   }
349 
350   // slop for regions
351   protected float slop;
352   private Configuration config;
353   private static final Random RANDOM = new Random(System.currentTimeMillis());
354   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
355 
356   protected final MetricsBalancer metricsBalancer = new MetricsBalancer();
357   protected MasterServices services;
358 
359   @Override
360   public void setConf(Configuration conf) {
361     setSlop(conf);
362     if (slop < 0) slop = 0;
363     else if (slop > 1) slop = 1;
364 
365     this.config = conf;
366   }
367 
368   protected void setSlop(Configuration conf) {
369     this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
370   }
371 
372   @Override
373   public Configuration getConf() {
374     return this.config;
375   }
376 
377   @Override
378   public void setClusterStatus(ClusterStatus st) {
379     // Not used except for the StocasticBalancer
380   }
381 
382   @Override
383   public void setMasterServices(MasterServices masterServices) {
384     this.services = masterServices;
385   }
386 
387   protected boolean needsBalance(ClusterLoadState cs) {
388     if (cs.getNumServers() < MIN_SERVER_BALANCE) {
389       if (LOG.isDebugEnabled()) {
390         LOG.debug("Not running balancer because only " + cs.getNumServers()
391             + " active regionserver(s)");
392       }
393       return false;
394     }
395     // Check if we even need to do any load balancing
396     // HBASE-3681 check sloppiness first
397     float average = cs.getLoadAverage(); // for logging
398     int floor = (int) Math.floor(average * (1 - slop));
399     int ceiling = (int) Math.ceil(average * (1 + slop));
400     if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
401       NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
402       if (LOG.isTraceEnabled()) {
403         // If nothing to balance, then don't say anything unless trace-level logging.
404         LOG.trace("Skipping load balancing because balanced cluster; " +
405           "servers=" + cs.getNumServers() + " " +
406           "regions=" + cs.getNumRegions() + " average=" + average + " " +
407           "mostloaded=" + serversByLoad.lastKey().getLoad() +
408           " leastloaded=" + serversByLoad.firstKey().getLoad());
409       }
410       return false;
411     }
412     return true;
413   }
414 
415   /**
416    * Generates a bulk assignment plan to be used on cluster startup using a
417    * simple round-robin assignment.
418    * <p>
419    * Takes a list of all the regions and all the servers in the cluster and
420    * returns a map of each server to the regions that it should be assigned.
421    * <p>
422    * Currently implemented as a round-robin assignment. Same invariant as load
423    * balancing, all servers holding floor(avg) or ceiling(avg).
424    *
425    * TODO: Use block locations from HDFS to place regions with their blocks
426    *
427    * @param regions all regions
428    * @param servers all servers
429    * @return map of server to the regions it should take, or null if no
430    *         assignment is possible (ie. no regions or no servers)
431    */
432   @Override
433   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
434       List<ServerName> servers) {
435     metricsBalancer.incrMiscInvocations();
436 
437     if (regions.isEmpty() || servers.isEmpty()) {
438       return null;
439     }
440     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
441     int numRegions = regions.size();
442     int numServers = servers.size();
443     int max = (int) Math.ceil((float) numRegions / numServers);
444     int serverIdx = 0;
445     if (numServers > 1) {
446       serverIdx = RANDOM.nextInt(numServers);
447     }
448     int regionIdx = 0;
449     for (int j = 0; j < numServers; j++) {
450       ServerName server = servers.get((j + serverIdx) % numServers);
451       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
452       for (int i = regionIdx; i < numRegions; i += numServers) {
453         serverRegions.add(regions.get(i % numRegions));
454       }
455       assignments.put(server, serverRegions);
456       regionIdx++;
457     }
458     return assignments;
459   }
460 
461   /**
462    * Generates an immediate assignment plan to be used by a new master for
463    * regions in transition that do not have an already known destination.
464    *
465    * Takes a list of regions that need immediate assignment and a list of all
466    * available servers. Returns a map of regions to the server they should be
467    * assigned to.
468    *
469    * This method will return quickly and does not do any intelligent balancing.
470    * The goal is to make a fast decision not the best decision possible.
471    *
472    * Currently this is random.
473    *
474    * @param regions
475    * @param servers
476    * @return map of regions to the server it should be assigned to
477    */
478   @Override
479   public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
480       List<ServerName> servers) {
481     metricsBalancer.incrMiscInvocations();
482 
483     Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
484     for (HRegionInfo region : regions) {
485       assignments.put(region, randomAssignment(region, servers));
486     }
487     return assignments;
488   }
489 
490   /**
491    * Used to assign a single region to a random server.
492    */
493   @Override
494   public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
495     metricsBalancer.incrMiscInvocations();
496 
497     if (servers == null || servers.isEmpty()) {
498       LOG.warn("Wanted to do random assignment but no servers to assign to");
499       return null;
500     }
501     return servers.get(RANDOM.nextInt(servers.size()));
502   }
503 
504   /**
505    * Generates a bulk assignment startup plan, attempting to reuse the existing
506    * assignment information from META, but adjusting for the specified list of
507    * available/online servers available for assignment.
508    * <p>
509    * Takes a map of all regions to their existing assignment from META. Also
510    * takes a list of online servers for regions to be assigned to. Attempts to
511    * retain all assignment, so in some instances initial assignment will not be
512    * completely balanced.
513    * <p>
514    * Any leftover regions without an existing server to be assigned to will be
515    * assigned randomly to available servers.
516    *
517    * @param regions regions and existing assignment from meta
518    * @param servers available servers
519    * @return map of servers and regions to be assigned to them
520    */
521   @Override
522   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
523       List<ServerName> servers) {
524     // Update metrics
525     metricsBalancer.incrMiscInvocations();
526 
527     // Group all of the old assignments by their hostname.
528     // We can't group directly by ServerName since the servers all have
529     // new start-codes.
530 
531     // Group the servers by their hostname. It's possible we have multiple
532     // servers on the same host on different ports.
533     ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
534     for (ServerName server : servers) {
535       serversByHostname.put(server.getHostname(), server);
536     }
537 
538     // Now come up with new assignments
539     Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
540 
541     for (ServerName server : servers) {
542       assignments.put(server, new ArrayList<HRegionInfo>());
543     }
544 
545     // Collection of the hostnames that used to have regions
546     // assigned, but for which we no longer have any RS running
547     // after the cluster restart.
548     Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
549 
550     int numRandomAssignments = 0;
551     int numRetainedAssigments = 0;
552     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
553       HRegionInfo region = entry.getKey();
554       ServerName oldServerName = entry.getValue();
555       List<ServerName> localServers = new ArrayList<ServerName>();
556       if (oldServerName != null) {
557         localServers = serversByHostname.get(oldServerName.getHostname());
558       }
559       if (localServers.isEmpty()) {
560         // No servers on the new cluster match up with this hostname,
561         // assign randomly.
562         ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
563         assignments.get(randomServer).add(region);
564         numRandomAssignments++;
565         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
566       } else if (localServers.size() == 1) {
567         // the usual case - one new server on same host
568         assignments.get(localServers.get(0)).add(region);
569         numRetainedAssigments++;
570       } else {
571         // multiple new servers in the cluster on this same host
572         int size = localServers.size();
573         ServerName target =
574             localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
575                 .nextInt(size));
576         assignments.get(target).add(region);
577         numRetainedAssigments++;
578       }
579     }
580 
581     String randomAssignMsg = "";
582     if (numRandomAssignments > 0) {
583       randomAssignMsg =
584           numRandomAssignments + " regions were assigned "
585               + "to random hosts, since the old hosts for these regions are no "
586               + "longer present in the cluster. These hosts were:\n  "
587               + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
588     }
589 
590     LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
591         + " retained the pre-restart assignment. " + randomAssignMsg);
592     return assignments;
593   }
594 
595   @Override
596   public void initialize() throws HBaseIOException{
597   }
598 
599   @Override
600   public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
601   }
602 
603   @Override
604   public void regionOffline(HRegionInfo regionInfo) {
605   }
606 
607   @Override
608   public boolean isStopped() {
609     return stopped;
610   }
611 
612   @Override
613   public void stop(String why) {
614     LOG.info("Load Balancer stop requested: "+why);
615     stopped = true;
616   }
617 }