001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import edu.umd.cs.findbugs.annotations.NonNull;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.Deque;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Map.Entry;
034import java.util.NavigableMap;
035import java.util.Random;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.function.Predicate;
039import java.util.stream.Collectors;
040import org.apache.commons.lang3.NotImplementedException;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseIOException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RackManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
058import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
064import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
065import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
066import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
067
068/**
069 * The base class for load balancers. It provides the the functions used to by
070 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
071 * in the edge cases. It doesn't provide an implementation of the
072 * actual balancing algorithm.
073 *
074 */
075@InterfaceAudience.Private
076public abstract class BaseLoadBalancer implements LoadBalancer {
077
078  public static final String BALANCER_DECISION_BUFFER_ENABLED =
079    "hbase.master.balancer.decision.buffer.enabled";
080  public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
081
082  protected static final int MIN_SERVER_BALANCE = 2;
083  private volatile boolean stopped = false;
084
085  private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
086
087  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
088    = load -> load.getRegionMetrics().isEmpty();
089
090  protected RegionLocationFinder regionFinder;
091  protected boolean useRegionFinder;
092  protected boolean isByTable = false;
093
094  /**
095   * Use to add balancer decision history to ring-buffer
096   */
097  protected NamedQueueRecorder namedQueueRecorder;
098
099  private static class DefaultRackManager extends RackManager {
100    @Override
101    public String getRack(ServerName server) {
102      return UNKNOWN_RACK;
103    }
104  }
105
106  /**
107   * The constructor that uses the basic MetricsBalancer
108   */
109  protected BaseLoadBalancer() {
110    metricsBalancer = new MetricsBalancer();
111    createRegionFinder();
112  }
113
114  /**
115   * This Constructor accepts an instance of MetricsBalancer,
116   * which will be used instead of creating a new one
117   */
118  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
119    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
120    createRegionFinder();
121  }
122
123  private void createRegionFinder() {
124    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
125    if (useRegionFinder) {
126      regionFinder = new RegionLocationFinder();
127    }
128  }
129
130  /**
131   * An efficient array based implementation similar to ClusterState for keeping
132   * the status of the cluster in terms of region assignment and distribution.
133   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
134   * hundreds of thousands of hashmap manipulations are very costly, which is why this
135   * class uses mostly indexes and arrays.
136   *
137   * Cluster tracks a list of unassigned regions, region assignments, and the server
138   * topology in terms of server names, hostnames and racks.
139   */
140  protected static class Cluster {
141    ServerName[] servers;
142    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
143    String[] racks;
144    boolean multiServersPerHost = false; // whether or not any host has more than one server
145
146    ArrayList<String> tables;
147    RegionInfo[] regions;
148    Deque<BalancerRegionLoad>[] regionLoads;
149    private RegionLocationFinder regionFinder;
150
151    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
152
153    int[]   serverIndexToHostIndex;      //serverIndex -> host index
154    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
155
156    int[][] regionsPerServer;            //serverIndex -> region list
157    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
158    int[][] regionsPerHost;              //hostIndex -> list of regions
159    int[][] regionsPerRack;              //rackIndex -> region list
160    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
161    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
162    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
163
164    int[][] serversPerHost;              //hostIndex -> list of server indexes
165    int[][] serversPerRack;              //rackIndex -> list of server indexes
166    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
167    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
168    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
169    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
170    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
171    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
172    boolean hasRegionReplicas = false;   //whether there is regions with replicas
173
174    Integer[] serverIndicesSortedByRegionCount;
175    Integer[] serverIndicesSortedByLocality;
176
177    Map<String, Integer> serversToIndex;
178    Map<String, Integer> hostsToIndex;
179    Map<String, Integer> racksToIndex;
180    Map<String, Integer> tablesToIndex;
181    Map<RegionInfo, Integer> regionsToIndex;
182    float[] localityPerServer;
183
184    int numServers;
185    int numHosts;
186    int numRacks;
187    int numTables;
188    int numRegions;
189
190    int numMovedRegions = 0; //num moved regions from the initial configuration
191    Map<ServerName, List<RegionInfo>> clusterState;
192
193    protected final RackManager rackManager;
194    // Maps region -> rackIndex -> locality of region on rack
195    private float[][] rackLocalities;
196    // Maps localityType -> region -> [server|rack]Index with highest locality
197    private int[][] regionsToMostLocalEntities;
198
199    protected Cluster(
200        Map<ServerName, List<RegionInfo>> clusterState,
201        Map<String, Deque<BalancerRegionLoad>> loads,
202        RegionLocationFinder regionFinder,
203        RackManager rackManager) {
204      this(null, clusterState, loads, regionFinder, rackManager);
205    }
206
207    @SuppressWarnings("unchecked")
208    protected Cluster(
209        Collection<RegionInfo> unassignedRegions,
210        Map<ServerName, List<RegionInfo>> clusterState,
211        Map<String, Deque<BalancerRegionLoad>> loads,
212        RegionLocationFinder regionFinder,
213        RackManager rackManager) {
214
215      if (unassignedRegions == null) {
216        unassignedRegions = EMPTY_REGION_LIST;
217      }
218
219      serversToIndex = new HashMap<>();
220      hostsToIndex = new HashMap<>();
221      racksToIndex = new HashMap<>();
222      tablesToIndex = new HashMap<>();
223
224      //TODO: We should get the list of tables from master
225      tables = new ArrayList<>();
226      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
227
228      numRegions = 0;
229
230      List<List<Integer>> serversPerHostList = new ArrayList<>();
231      List<List<Integer>> serversPerRackList = new ArrayList<>();
232      this.clusterState = clusterState;
233      this.regionFinder = regionFinder;
234
235      // Use servername and port as there can be dead servers in this list. We want everything with
236      // a matching hostname and port to have the same index.
237      for (ServerName sn : clusterState.keySet()) {
238        if (sn == null) {
239          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
240              "skipping; unassigned regions?");
241          if (LOG.isTraceEnabled()) {
242            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
243          }
244          continue;
245        }
246        if (serversToIndex.get(sn.getAddress().toString()) == null) {
247          serversToIndex.put(sn.getHostAndPort(), numServers++);
248        }
249        if (!hostsToIndex.containsKey(sn.getHostname())) {
250          hostsToIndex.put(sn.getHostname(), numHosts++);
251          serversPerHostList.add(new ArrayList<>(1));
252        }
253
254        int serverIndex = serversToIndex.get(sn.getHostAndPort());
255        int hostIndex = hostsToIndex.get(sn.getHostname());
256        serversPerHostList.get(hostIndex).add(serverIndex);
257
258        String rack = this.rackManager.getRack(sn);
259        if (!racksToIndex.containsKey(rack)) {
260          racksToIndex.put(rack, numRacks++);
261          serversPerRackList.add(new ArrayList<>());
262        }
263        int rackIndex = racksToIndex.get(rack);
264        serversPerRackList.get(rackIndex).add(serverIndex);
265      }
266
267      // Count how many regions there are.
268      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
269        numRegions += entry.getValue().size();
270      }
271      numRegions += unassignedRegions.size();
272
273      regionsToIndex = new HashMap<>(numRegions);
274      servers = new ServerName[numServers];
275      serversPerHost = new int[numHosts][];
276      serversPerRack = new int[numRacks][];
277      regions = new RegionInfo[numRegions];
278      regionIndexToServerIndex = new int[numRegions];
279      initialRegionIndexToServerIndex = new int[numRegions];
280      regionIndexToTableIndex = new int[numRegions];
281      regionIndexToPrimaryIndex = new int[numRegions];
282      regionLoads = new Deque[numRegions];
283
284      regionLocations = new int[numRegions][];
285      serverIndicesSortedByRegionCount = new Integer[numServers];
286      serverIndicesSortedByLocality = new Integer[numServers];
287      localityPerServer = new float[numServers];
288
289      serverIndexToHostIndex = new int[numServers];
290      serverIndexToRackIndex = new int[numServers];
291      regionsPerServer = new int[numServers][];
292      serverIndexToRegionsOffset = new int[numServers];
293      regionsPerHost = new int[numHosts][];
294      regionsPerRack = new int[numRacks][];
295      primariesOfRegionsPerServer = new int[numServers][];
296      primariesOfRegionsPerHost = new int[numHosts][];
297      primariesOfRegionsPerRack = new int[numRacks][];
298
299      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
300
301      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
302        if (entry.getKey() == null) {
303          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
304          continue;
305        }
306        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
307
308        // keep the servername if this is the first server name for this hostname
309        // or this servername has the newest startcode.
310        if (servers[serverIndex] == null ||
311            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
312          servers[serverIndex] = entry.getKey();
313        }
314
315        if (regionsPerServer[serverIndex] != null) {
316          // there is another server with the same hostAndPort in ClusterState.
317          // allocate the array for the total size
318          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
319        } else {
320          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
321        }
322        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
323        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
324        serverIndicesSortedByLocality[serverIndex] = serverIndex;
325      }
326
327      hosts = new String[numHosts];
328      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
329        hosts[entry.getValue()] = entry.getKey();
330      }
331      racks = new String[numRacks];
332      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
333        racks[entry.getValue()] = entry.getKey();
334      }
335
336      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
337        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
338        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
339
340        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
341        serverIndexToHostIndex[serverIndex] = hostIndex;
342
343        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
344        serverIndexToRackIndex[serverIndex] = rackIndex;
345
346        for (RegionInfo region : entry.getValue()) {
347          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
348          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
349          regionIndex++;
350        }
351        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
352      }
353
354      for (RegionInfo region : unassignedRegions) {
355        registerRegion(region, regionIndex, -1, loads, regionFinder);
356        regionIndex++;
357      }
358
359      for (int i = 0; i < serversPerHostList.size(); i++) {
360        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
361        for (int j = 0; j < serversPerHost[i].length; j++) {
362          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
363        }
364        if (serversPerHost[i].length > 1) {
365          multiServersPerHost = true;
366        }
367      }
368
369      for (int i = 0; i < serversPerRackList.size(); i++) {
370        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
371        for (int j = 0; j < serversPerRack[i].length; j++) {
372          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
373        }
374      }
375
376      numTables = tables.size();
377      numRegionsPerServerPerTable = new int[numServers][numTables];
378
379      for (int i = 0; i < numServers; i++) {
380        for (int j = 0; j < numTables; j++) {
381          numRegionsPerServerPerTable[i][j] = 0;
382        }
383      }
384
385      for (int i=0; i < regionIndexToServerIndex.length; i++) {
386        if (regionIndexToServerIndex[i] >= 0) {
387          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
388        }
389      }
390
391      numMaxRegionsPerTable = new int[numTables];
392      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
393        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
394          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
395            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
396          }
397        }
398      }
399
400      for (int i = 0; i < regions.length; i ++) {
401        RegionInfo info = regions[i];
402        if (RegionReplicaUtil.isDefaultReplica(info)) {
403          regionIndexToPrimaryIndex[i] = i;
404        } else {
405          hasRegionReplicas = true;
406          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
407          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
408        }
409      }
410
411      for (int i = 0; i < regionsPerServer.length; i++) {
412        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
413        for (int j = 0; j < regionsPerServer[i].length; j++) {
414          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
415          primariesOfRegionsPerServer[i][j] = primaryIndex;
416        }
417        // sort the regions by primaries.
418        Arrays.sort(primariesOfRegionsPerServer[i]);
419      }
420
421      // compute regionsPerHost
422      if (multiServersPerHost) {
423        for (int i = 0 ; i < serversPerHost.length; i++) {
424          int numRegionsPerHost = 0;
425          for (int j = 0; j < serversPerHost[i].length; j++) {
426            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
427          }
428          regionsPerHost[i] = new int[numRegionsPerHost];
429          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
430        }
431        for (int i = 0 ; i < serversPerHost.length; i++) {
432          int numRegionPerHostIndex = 0;
433          for (int j = 0; j < serversPerHost[i].length; j++) {
434            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
435              int region = regionsPerServer[serversPerHost[i][j]][k];
436              regionsPerHost[i][numRegionPerHostIndex] = region;
437              int primaryIndex = regionIndexToPrimaryIndex[region];
438              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
439              numRegionPerHostIndex++;
440            }
441          }
442          // sort the regions by primaries.
443          Arrays.sort(primariesOfRegionsPerHost[i]);
444        }
445      }
446
447      // compute regionsPerRack
448      if (numRacks > 1) {
449        for (int i = 0 ; i < serversPerRack.length; i++) {
450          int numRegionsPerRack = 0;
451          for (int j = 0; j < serversPerRack[i].length; j++) {
452            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
453          }
454          regionsPerRack[i] = new int[numRegionsPerRack];
455          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
456        }
457
458        for (int i = 0 ; i < serversPerRack.length; i++) {
459          int numRegionPerRackIndex = 0;
460          for (int j = 0; j < serversPerRack[i].length; j++) {
461            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
462              int region = regionsPerServer[serversPerRack[i][j]][k];
463              regionsPerRack[i][numRegionPerRackIndex] = region;
464              int primaryIndex = regionIndexToPrimaryIndex[region];
465              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
466              numRegionPerRackIndex++;
467            }
468          }
469          // sort the regions by primaries.
470          Arrays.sort(primariesOfRegionsPerRack[i]);
471        }
472      }
473    }
474
475    /** Helper for Cluster constructor to handle a region */
476    private void registerRegion(RegionInfo region, int regionIndex,
477        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
478        RegionLocationFinder regionFinder) {
479      String tableName = region.getTable().getNameAsString();
480      if (!tablesToIndex.containsKey(tableName)) {
481        tables.add(tableName);
482        tablesToIndex.put(tableName, tablesToIndex.size());
483      }
484      int tableIndex = tablesToIndex.get(tableName);
485
486      regionsToIndex.put(region, regionIndex);
487      regions[regionIndex] = region;
488      regionIndexToServerIndex[regionIndex] = serverIndex;
489      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
490      regionIndexToTableIndex[regionIndex] = tableIndex;
491
492      // region load
493      if (loads != null) {
494        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
495        // That could have failed if the RegionLoad is using the other regionName
496        if (rl == null) {
497          // Try getting the region load using encoded name.
498          rl = loads.get(region.getEncodedName());
499        }
500        regionLoads[regionIndex] = rl;
501      }
502
503      if (regionFinder != null) {
504        // region location
505        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
506        regionLocations[regionIndex] = new int[loc.size()];
507        for (int i = 0; i < loc.size(); i++) {
508          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
509              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
510                  : serversToIndex.get(loc.get(i).getHostAndPort()));
511        }
512      }
513    }
514
515    /**
516     * Returns true iff a given server has less regions than the balanced amount
517     */
518    public boolean serverHasTooFewRegions(int server) {
519      int minLoad = this.numRegions / numServers;
520      int numRegions = getNumRegions(server);
521      return numRegions < minLoad;
522    }
523
524    /**
525     * Retrieves and lazily initializes a field storing the locality of
526     * every region/server combination
527     */
528    public float[][] getOrComputeRackLocalities() {
529      if (rackLocalities == null || regionsToMostLocalEntities == null) {
530        computeCachedLocalities();
531      }
532      return rackLocalities;
533    }
534
535    /**
536     * Lazily initializes and retrieves a mapping of region -> server for which region has
537     * the highest the locality
538     */
539    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
540      if (rackLocalities == null || regionsToMostLocalEntities == null) {
541        computeCachedLocalities();
542      }
543      return regionsToMostLocalEntities[type.ordinal()];
544    }
545
546    /**
547     * Looks up locality from cache of localities. Will create cache if it does
548     * not already exist.
549     */
550    public float getOrComputeLocality(int region, int entity, LocalityType type) {
551      switch (type) {
552        case SERVER:
553          return getLocalityOfRegion(region, entity);
554        case RACK:
555          return getOrComputeRackLocalities()[region][entity];
556        default:
557          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
558      }
559    }
560
561    /**
562     * Returns locality weighted by region size in MB. Will create locality cache
563     * if it does not already exist.
564     */
565    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
566      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
567    }
568
569    /**
570     * Returns the size in MB from the most recent RegionLoad for region
571     */
572    public int getRegionSizeMB(int region) {
573      Deque<BalancerRegionLoad> load = regionLoads[region];
574      // This means regions have no actual data on disk
575      if (load == null) {
576        return 0;
577      }
578      return regionLoads[region].getLast().getStorefileSizeMB();
579    }
580
581    /**
582     * Computes and caches the locality for each region/rack combinations,
583     * as well as storing a mapping of region -> server and region -> rack such that server
584     * and rack have the highest locality for region
585     */
586    private void computeCachedLocalities() {
587      rackLocalities = new float[numRegions][numRacks];
588      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
589
590      // Compute localities and find most local server per region
591      for (int region = 0; region < numRegions; region++) {
592        int serverWithBestLocality = 0;
593        float bestLocalityForRegion = 0;
594        for (int server = 0; server < numServers; server++) {
595          // Aggregate per-rack locality
596          float locality = getLocalityOfRegion(region, server);
597          int rack = serverIndexToRackIndex[server];
598          int numServersInRack = serversPerRack[rack].length;
599          rackLocalities[region][rack] += locality / numServersInRack;
600
601          if (locality > bestLocalityForRegion) {
602            serverWithBestLocality = server;
603            bestLocalityForRegion = locality;
604          }
605        }
606        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
607
608        // Find most local rack per region
609        int rackWithBestLocality = 0;
610        float bestRackLocalityForRegion = 0.0f;
611        for (int rack = 0; rack < numRacks; rack++) {
612          float rackLocality = rackLocalities[region][rack];
613          if (rackLocality > bestRackLocalityForRegion) {
614            bestRackLocalityForRegion = rackLocality;
615            rackWithBestLocality = rack;
616          }
617        }
618        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
619      }
620
621    }
622
623    /**
624     * Maps region index to rack index
625     */
626    public int getRackForRegion(int region) {
627      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
628    }
629
630    enum LocalityType {
631      SERVER,
632      RACK
633    }
634
635    /** An action to move or swap a region */
636    public static class Action {
637      public enum Type {
638        ASSIGN_REGION,
639        MOVE_REGION,
640        SWAP_REGIONS,
641        NULL,
642      }
643
644      public Type type;
645      public Action (Type type) {this.type = type;}
646      /** Returns an Action which would undo this action */
647      public Action undoAction() { return this; }
648      @Override
649      public String toString() { return type + ":";}
650    }
651
652    public static class AssignRegionAction extends Action {
653      public int region;
654      public int server;
655      public AssignRegionAction(int region, int server) {
656        super(Type.ASSIGN_REGION);
657        this.region = region;
658        this.server = server;
659      }
660      @Override
661      public Action undoAction() {
662        // TODO implement this. This action is not being used by the StochasticLB for now
663        // in case it uses it, we should implement this function.
664        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
665      }
666      @Override
667      public String toString() {
668        return type + ": " + region + ":" + server;
669      }
670    }
671
672    public static class MoveRegionAction extends Action {
673      public int region;
674      public int fromServer;
675      public int toServer;
676
677      public MoveRegionAction(int region, int fromServer, int toServer) {
678        super(Type.MOVE_REGION);
679        this.fromServer = fromServer;
680        this.region = region;
681        this.toServer = toServer;
682      }
683      @Override
684      public Action undoAction() {
685        return new MoveRegionAction (region, toServer, fromServer);
686      }
687      @Override
688      public String toString() {
689        return type + ": " + region + ":" + fromServer + " -> " + toServer;
690      }
691    }
692
693    public static class SwapRegionsAction extends Action {
694      public int fromServer;
695      public int fromRegion;
696      public int toServer;
697      public int toRegion;
698      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
699        super(Type.SWAP_REGIONS);
700        this.fromServer = fromServer;
701        this.fromRegion = fromRegion;
702        this.toServer = toServer;
703        this.toRegion = toRegion;
704      }
705      @Override
706      public Action undoAction() {
707        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
708      }
709      @Override
710      public String toString() {
711        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
712      }
713    }
714
715    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
716        justification="Mistake. Too disruptive to change now")
717    public static final Action NullAction = new Action(Type.NULL);
718
719    public void doAction(Action action) {
720      switch (action.type) {
721      case NULL: break;
722      case ASSIGN_REGION:
723        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
724        assert action instanceof AssignRegionAction: action.getClass();
725        AssignRegionAction ar = (AssignRegionAction) action;
726        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
727        regionMoved(ar.region, -1, ar.server);
728        break;
729      case MOVE_REGION:
730        assert action instanceof MoveRegionAction: action.getClass();
731        MoveRegionAction mra = (MoveRegionAction) action;
732        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
733        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
734        regionMoved(mra.region, mra.fromServer, mra.toServer);
735        break;
736      case SWAP_REGIONS:
737        assert action instanceof SwapRegionsAction: action.getClass();
738        SwapRegionsAction a = (SwapRegionsAction) action;
739        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
740        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
741        regionMoved(a.fromRegion, a.fromServer, a.toServer);
742        regionMoved(a.toRegion, a.toServer, a.fromServer);
743        break;
744      default:
745        throw new RuntimeException("Uknown action:" + action.type);
746      }
747    }
748
749    /**
750     * Return true if the placement of region on server would lower the availability
751     * of the region in question
752     * @return true or false
753     */
754    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
755      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
756        return false; // safeguard against race between cluster.servers and servers from LB method args
757      }
758      int server = serversToIndex.get(serverName.getHostAndPort());
759      int region = regionsToIndex.get(regionInfo);
760
761      // Region replicas for same region should better assign to different servers
762      for (int i : regionsPerServer[server]) {
763        RegionInfo otherRegionInfo = regions[i];
764        if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
765          return true;
766        }
767      }
768
769      int primary = regionIndexToPrimaryIndex[region];
770      if (primary == -1) {
771        return false;
772      }
773      // there is a subset relation for server < host < rack
774      // check server first
775      if (contains(primariesOfRegionsPerServer[server], primary)) {
776        // check for whether there are other servers that we can place this region
777        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
778          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
779            return true; // meaning there is a better server
780          }
781        }
782        return false; // there is not a better server to place this
783      }
784
785      // check host
786      if (multiServersPerHost) {
787        // these arrays would only be allocated if we have more than one server per host
788        int host = serverIndexToHostIndex[server];
789        if (contains(primariesOfRegionsPerHost[host], primary)) {
790          // check for whether there are other hosts that we can place this region
791          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
792            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
793              return true; // meaning there is a better host
794            }
795          }
796          return false; // there is not a better host to place this
797        }
798      }
799
800      // check rack
801      if (numRacks > 1) {
802        int rack = serverIndexToRackIndex[server];
803        if (contains(primariesOfRegionsPerRack[rack], primary)) {
804          // check for whether there are other racks that we can place this region
805          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
806            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
807              return true; // meaning there is a better rack
808            }
809          }
810          return false; // there is not a better rack to place this
811        }
812      }
813
814      return false;
815    }
816
817    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
818      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
819        return;
820      }
821      int server = serversToIndex.get(serverName.getHostAndPort());
822      int region = regionsToIndex.get(regionInfo);
823      doAction(new AssignRegionAction(region, server));
824    }
825
826    void regionMoved(int region, int oldServer, int newServer) {
827      regionIndexToServerIndex[region] = newServer;
828      if (initialRegionIndexToServerIndex[region] == newServer) {
829        numMovedRegions--; //region moved back to original location
830      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
831        numMovedRegions++; //region moved from original location
832      }
833      int tableIndex = regionIndexToTableIndex[region];
834      if (oldServer >= 0) {
835        numRegionsPerServerPerTable[oldServer][tableIndex]--;
836      }
837      numRegionsPerServerPerTable[newServer][tableIndex]++;
838
839      //check whether this caused maxRegionsPerTable in the new Server to be updated
840      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
841        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
842      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
843          == numMaxRegionsPerTable[tableIndex]) {
844        //recompute maxRegionsPerTable since the previous value was coming from the old server
845        numMaxRegionsPerTable[tableIndex] = 0;
846        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
847          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
848            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
849          }
850        }
851      }
852
853      // update for servers
854      int primary = regionIndexToPrimaryIndex[region];
855      if (oldServer >= 0) {
856        primariesOfRegionsPerServer[oldServer] = removeRegion(
857          primariesOfRegionsPerServer[oldServer], primary);
858      }
859      primariesOfRegionsPerServer[newServer] = addRegionSorted(
860        primariesOfRegionsPerServer[newServer], primary);
861
862      // update for hosts
863      if (multiServersPerHost) {
864        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
865        int newHost = serverIndexToHostIndex[newServer];
866        if (newHost != oldHost) {
867          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
868          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
869          if (oldHost >= 0) {
870            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
871            primariesOfRegionsPerHost[oldHost] = removeRegion(
872              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
873          }
874        }
875      }
876
877      // update for racks
878      if (numRacks > 1) {
879        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
880        int newRack = serverIndexToRackIndex[newServer];
881        if (newRack != oldRack) {
882          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
883          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
884          if (oldRack >= 0) {
885            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
886            primariesOfRegionsPerRack[oldRack] = removeRegion(
887              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
888          }
889        }
890      }
891    }
892
893    int[] removeRegion(int[] regions, int regionIndex) {
894      //TODO: this maybe costly. Consider using linked lists
895      int[] newRegions = new int[regions.length - 1];
896      int i = 0;
897      for (i = 0; i < regions.length; i++) {
898        if (regions[i] == regionIndex) {
899          break;
900        }
901        newRegions[i] = regions[i];
902      }
903      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
904      return newRegions;
905    }
906
907    int[] addRegion(int[] regions, int regionIndex) {
908      int[] newRegions = new int[regions.length + 1];
909      System.arraycopy(regions, 0, newRegions, 0, regions.length);
910      newRegions[newRegions.length - 1] = regionIndex;
911      return newRegions;
912    }
913
914    int[] addRegionSorted(int[] regions, int regionIndex) {
915      int[] newRegions = new int[regions.length + 1];
916      int i = 0;
917      for (i = 0; i < regions.length; i++) { // find the index to insert
918        if (regions[i] > regionIndex) {
919          break;
920        }
921      }
922      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
923      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
924      newRegions[i] = regionIndex;
925
926      return newRegions;
927    }
928
929    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
930      int i = 0;
931      for (i = 0; i < regions.length; i++) {
932        if (regions[i] == regionIndex) {
933          regions[i] = newRegionIndex;
934          break;
935        }
936      }
937      return regions;
938    }
939
940    void sortServersByRegionCount() {
941      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
942    }
943
944    int getNumRegions(int server) {
945      return regionsPerServer[server].length;
946    }
947
948    boolean contains(int[] arr, int val) {
949      return Arrays.binarySearch(arr, val) >= 0;
950    }
951
952    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
953
954    int getLowestLocalityRegionOnServer(int serverIndex) {
955      if (regionFinder != null) {
956        float lowestLocality = 1.0f;
957        int lowestLocalityRegionIndex = -1;
958        if (regionsPerServer[serverIndex].length == 0) {
959          // No regions on that region server
960          return -1;
961        }
962        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
963          int regionIndex = regionsPerServer[serverIndex][j];
964          HDFSBlocksDistribution distribution = regionFinder
965              .getBlockDistribution(regions[regionIndex]);
966          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
967          // skip empty region
968          if (distribution.getUniqueBlocksTotalWeight() == 0) {
969            continue;
970          }
971          if (locality < lowestLocality) {
972            lowestLocality = locality;
973            lowestLocalityRegionIndex = j;
974          }
975        }
976        if (lowestLocalityRegionIndex == -1) {
977          return -1;
978        }
979        if (LOG.isTraceEnabled()) {
980          LOG.trace("Lowest locality region is "
981              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
982                  .getRegionNameAsString() + " with locality " + lowestLocality
983              + " and its region server contains " + regionsPerServer[serverIndex].length
984              + " regions");
985        }
986        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
987      } else {
988        return -1;
989      }
990    }
991
992    float getLocalityOfRegion(int region, int server) {
993      if (regionFinder != null) {
994        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
995        return distribution.getBlockLocalityIndex(servers[server].getHostname());
996      } else {
997        return 0f;
998      }
999    }
1000
1001    protected void setNumRegions(int numRegions) {
1002      this.numRegions = numRegions;
1003    }
1004
1005    protected void setNumMovedRegions(int numMovedRegions) {
1006      this.numMovedRegions = numMovedRegions;
1007    }
1008
1009    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
1010        justification="Not important but should be fixed")
1011    @Override
1012    public String toString() {
1013      StringBuilder desc = new StringBuilder("Cluster={servers=[");
1014      for(ServerName sn:servers) {
1015        desc.append(sn.getHostAndPort()).append(", ");
1016      }
1017      desc.append("], serverIndicesSortedByRegionCount=")
1018          .append(Arrays.toString(serverIndicesSortedByRegionCount))
1019          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
1020
1021      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
1022          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1023          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1024          .append(numMovedRegions).append('}');
1025      return desc.toString();
1026    }
1027  }
1028
1029  // slop for regions
1030  protected float slop;
1031  // overallSlop to control simpleLoadBalancer's cluster level threshold
1032  protected float overallSlop;
1033  protected Configuration config = HBaseConfiguration.create();
1034  protected RackManager rackManager;
1035  private static final Random RANDOM = new Random(System.currentTimeMillis());
1036  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1037  protected MetricsBalancer metricsBalancer = null;
1038  protected ClusterMetrics clusterStatus = null;
1039  protected ServerName masterServerName;
1040  protected MasterServices services;
1041
1042  /**
1043   * @deprecated since 2.4.0, will be removed in 3.0.0.
1044   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1045   */
1046  @Deprecated
1047  protected boolean onlySystemTablesOnMaster;
1048
1049  protected boolean maintenanceMode;
1050
1051  @Override
1052  public void setConf(Configuration conf) {
1053    this.config = conf;
1054    setSlop(conf);
1055    if (slop < 0) slop = 0;
1056    else if (slop > 1) slop = 1;
1057
1058    if (overallSlop < 0) overallSlop = 0;
1059    else if (overallSlop > 1) overallSlop = 1;
1060
1061    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1062
1063    this.rackManager = new RackManager(getConf());
1064    if (useRegionFinder) {
1065      regionFinder.setConf(conf);
1066    }
1067    this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
1068    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1069    LOG.info("slop={}, systemTablesOnMaster={}",
1070        this.slop, this.onlySystemTablesOnMaster);
1071  }
1072
1073  protected void setSlop(Configuration conf) {
1074    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1075    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1076  }
1077
1078  /**
1079   * Check if a region belongs to some system table.
1080   * If so, the primary replica may be expected to be put on the master regionserver.
1081   *
1082   * @deprecated since 2.4.0, will be removed in 3.0.0.
1083   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1084   */
1085  @Deprecated
1086  public boolean shouldBeOnMaster(RegionInfo region) {
1087    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1088        && region.getTable().isSystemTable();
1089  }
1090
1091  /**
1092   * Balance the regions that should be on master regionserver.
1093   *
1094   * @deprecated since 2.4.0, will be removed in 3.0.0.
1095   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1096   */
1097  @Deprecated
1098  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1099    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1100    List<RegionPlan> plans = null;
1101    List<RegionInfo> regions = clusterMap.get(masterServerName);
1102    if (regions != null) {
1103      Iterator<ServerName> keyIt = null;
1104      for (RegionInfo region: regions) {
1105        if (shouldBeOnMaster(region)) continue;
1106
1107        // Find a non-master regionserver to host the region
1108        if (keyIt == null || !keyIt.hasNext()) {
1109          keyIt = clusterMap.keySet().iterator();
1110        }
1111        ServerName dest = keyIt.next();
1112        if (masterServerName.equals(dest)) {
1113          if (!keyIt.hasNext()) {
1114            keyIt = clusterMap.keySet().iterator();
1115          }
1116          dest = keyIt.next();
1117        }
1118
1119        // Move this region away from the master regionserver
1120        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1121        if (plans == null) {
1122          plans = new ArrayList<>();
1123        }
1124        plans.add(plan);
1125      }
1126    }
1127    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1128      if (masterServerName.equals(server.getKey())) continue;
1129      for (RegionInfo region: server.getValue()) {
1130        if (!shouldBeOnMaster(region)) continue;
1131
1132        // Move this region to the master regionserver
1133        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1134        if (plans == null) {
1135          plans = new ArrayList<>();
1136        }
1137        plans.add(plan);
1138      }
1139    }
1140    return plans;
1141  }
1142
1143  /**
1144   * If master is configured to carry system tables only, in here is
1145   * where we figure what to assign it.
1146   *
1147   * @deprecated since 2.4.0, will be removed in 3.0.0.
1148   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1149   */
1150  @Deprecated
1151  @NonNull
1152  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1153      Collection<RegionInfo> regions, List<ServerName> servers) {
1154    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1155    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1156      if (masterServerName != null && servers.contains(masterServerName)) {
1157        assignments.put(masterServerName, new ArrayList<>());
1158        for (RegionInfo region : regions) {
1159          if (shouldBeOnMaster(region)) {
1160            assignments.get(masterServerName).add(region);
1161          }
1162        }
1163      }
1164    }
1165    return assignments;
1166  }
1167
1168  @Override
1169  public Configuration getConf() {
1170    return this.config;
1171  }
1172
1173  @Override
1174  public synchronized void setClusterMetrics(ClusterMetrics st) {
1175    this.clusterStatus = st;
1176    if (useRegionFinder) {
1177      regionFinder.setClusterMetrics(st);
1178    }
1179  }
1180
1181
1182  @Override
1183  public void setMasterServices(MasterServices masterServices) {
1184    masterServerName = masterServices.getServerName();
1185    this.services = masterServices;
1186    if (useRegionFinder) {
1187      this.regionFinder.setServices(masterServices);
1188    }
1189    if (this.services.isInMaintenanceMode()) {
1190      this.maintenanceMode = true;
1191    }
1192  }
1193
1194  @Override
1195  public void postMasterStartupInitialize() {
1196    if (services != null && regionFinder != null) {
1197      try {
1198        Set<RegionInfo> regions =
1199            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1200        regionFinder.refreshAndWait(regions);
1201      } catch (Exception e) {
1202        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1203      }
1204    }
1205  }
1206
1207  public void setRackManager(RackManager rackManager) {
1208    this.rackManager = rackManager;
1209  }
1210
1211  protected boolean needsBalance(TableName tableName, Cluster c) {
1212    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1213    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1214      if (LOG.isDebugEnabled()) {
1215        LOG.debug("Not running balancer because only " + cs.getNumServers()
1216            + " active regionserver(s)");
1217      }
1218      return false;
1219    }
1220    if(areSomeRegionReplicasColocated(c)) return true;
1221    if(idleRegionServerExist(c)) {
1222      return true;
1223    }
1224
1225    // Check if we even need to do any load balancing
1226    // HBASE-3681 check sloppiness first
1227    float average = cs.getLoadAverage(); // for logging
1228    int floor = (int) Math.floor(average * (1 - slop));
1229    int ceiling = (int) Math.ceil(average * (1 + slop));
1230    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1231      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1232      if (LOG.isTraceEnabled()) {
1233        // If nothing to balance, then don't say anything unless trace-level logging.
1234        LOG.trace("Skipping load balancing because balanced cluster; " +
1235          "servers=" + cs.getNumServers() +
1236          " regions=" + cs.getNumRegions() + " average=" + average +
1237          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1238          " leastloaded=" + serversByLoad.firstKey().getLoad());
1239      }
1240      return false;
1241    }
1242    return true;
1243  }
1244
1245  /**
1246   * Subclasses should implement this to return true if the cluster has nodes that hosts
1247   * multiple replicas for the same region, or, if there are multiple racks and the same
1248   * rack hosts replicas of the same region
1249   * @param c Cluster information
1250   * @return whether region replicas are currently co-located
1251   */
1252  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1253    return false;
1254  }
1255
1256  protected final boolean idleRegionServerExist(Cluster c){
1257    boolean isServerExistsWithMoreRegions = false;
1258    boolean isServerExistsWithZeroRegions = false;
1259    for (int[] serverList: c.regionsPerServer){
1260      if (serverList.length > 1) {
1261        isServerExistsWithMoreRegions = true;
1262      }
1263      if (serverList.length == 0) {
1264        isServerExistsWithZeroRegions = true;
1265      }
1266    }
1267    return isServerExistsWithMoreRegions && isServerExistsWithZeroRegions;
1268  }
1269
1270  /**
1271   * Generates a bulk assignment plan to be used on cluster startup using a
1272   * simple round-robin assignment.
1273   * <p>
1274   * Takes a list of all the regions and all the servers in the cluster and
1275   * returns a map of each server to the regions that it should be assigned.
1276   * <p>
1277   * Currently implemented as a round-robin assignment. Same invariant as load
1278   * balancing, all servers holding floor(avg) or ceiling(avg).
1279   *
1280   * TODO: Use block locations from HDFS to place regions with their blocks
1281   *
1282   * @param regions all regions
1283   * @param servers all servers
1284   * @return map of server to the regions it should take, or emptyMap if no
1285   *         assignment is possible (ie. no servers)
1286   */
1287  @Override
1288  @NonNull
1289  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1290      List<ServerName> servers) throws HBaseIOException {
1291    metricsBalancer.incrMiscInvocations();
1292    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1293    if (!assignments.isEmpty()) {
1294      servers = new ArrayList<>(servers);
1295      // Guarantee not to put other regions on master
1296      servers.remove(masterServerName);
1297      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1298      if (!masterRegions.isEmpty()) {
1299        regions = new ArrayList<>(regions);
1300        regions.removeAll(masterRegions);
1301      }
1302    }
1303    /**
1304     * only need assign system table
1305     */
1306    if (this.maintenanceMode || regions.isEmpty()) {
1307      return assignments;
1308    }
1309
1310    int numServers = servers == null ? 0 : servers.size();
1311    if (numServers == 0) {
1312      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1313      return Collections.emptyMap();
1314    }
1315
1316    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1317    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1318    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1319    // and balanced. This should also run fast with fewer number of iterations.
1320
1321    if (numServers == 1) { // Only one server, nothing fancy we can do here
1322      ServerName server = servers.get(0);
1323      assignments.put(server, new ArrayList<>(regions));
1324      return assignments;
1325    }
1326
1327    Cluster cluster = createCluster(servers, regions);
1328    roundRobinAssignment(cluster, regions, servers, assignments);
1329    return assignments;
1330  }
1331
1332  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
1333      throws HBaseIOException {
1334    boolean hasRegionReplica = false;
1335    try {
1336      if (services != null && services.getTableDescriptors() != null) {
1337        Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
1338        for (RegionInfo regionInfo : regions) {
1339          TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
1340          if (td != null && td.getRegionReplication() > 1) {
1341            hasRegionReplica = true;
1342            break;
1343          }
1344        }
1345      }
1346    } catch (IOException ioe) {
1347      throw new HBaseIOException(ioe);
1348    }
1349
1350    // Get the snapshot of the current assignments for the regions in question, and then create
1351    // a cluster out of it. Note that we might have replicas already assigned to some servers
1352    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1353    // replicas of the regions that are passed (for performance).
1354    Map<ServerName, List<RegionInfo>> clusterState = null;
1355    if (!hasRegionReplica) {
1356      clusterState = getRegionAssignmentsByServer(regions);
1357    } else {
1358      // for the case where we have region replica it is better we get the entire cluster's snapshot
1359      clusterState = getRegionAssignmentsByServer(null);
1360    }
1361
1362    for (ServerName server : servers) {
1363      if (!clusterState.containsKey(server)) {
1364        clusterState.put(server, EMPTY_REGION_LIST);
1365      }
1366    }
1367    return new Cluster(regions, clusterState, null, this.regionFinder,
1368        rackManager);
1369  }
1370
1371  private List<ServerName> findIdleServers(List<ServerName> servers) {
1372    return this.services.getServerManager()
1373            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1374  }
1375
1376  /**
1377   * Used to assign a single region to a random server.
1378   */
1379  @Override
1380  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1381      throws HBaseIOException {
1382    metricsBalancer.incrMiscInvocations();
1383    if (servers != null && servers.contains(masterServerName)) {
1384      if (shouldBeOnMaster(regionInfo)) {
1385        return masterServerName;
1386      }
1387      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1388        // Guarantee we do not put any regions on master
1389        servers = new ArrayList<>(servers);
1390        servers.remove(masterServerName);
1391      }
1392    }
1393
1394    int numServers = servers == null ? 0 : servers.size();
1395    if (numServers == 0) {
1396      LOG.warn("Wanted to retain assignment but no servers to assign to");
1397      return null;
1398    }
1399    if (numServers == 1) { // Only one server, nothing fancy we can do here
1400      return servers.get(0);
1401    }
1402    List<ServerName> idleServers = findIdleServers(servers);
1403    if (idleServers.size() == 1) {
1404      return idleServers.get(0);
1405    }
1406    final List<ServerName> finalServers = idleServers.isEmpty() ?
1407            servers : idleServers;
1408    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1409    Cluster cluster = createCluster(finalServers, regions);
1410    return randomAssignment(cluster, regionInfo, finalServers);
1411  }
1412
1413  /**
1414   * Generates a bulk assignment startup plan, attempting to reuse the existing
1415   * assignment information from META, but adjusting for the specified list of
1416   * available/online servers available for assignment.
1417   * <p>
1418   * Takes a map of all regions to their existing assignment from META. Also
1419   * takes a list of online servers for regions to be assigned to. Attempts to
1420   * retain all assignment, so in some instances initial assignment will not be
1421   * completely balanced.
1422   * <p>
1423   * Any leftover regions without an existing server to be assigned to will be
1424   * assigned randomly to available servers.
1425   *
1426   * @param regions regions and existing assignment from meta
1427   * @param servers available servers
1428   * @return map of servers and regions to be assigned to them, or emptyMap if no
1429   *           assignment is possible (ie. no servers)
1430   */
1431  @Override
1432  @NonNull
1433  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1434      List<ServerName> servers) throws HBaseIOException {
1435    // Update metrics
1436    metricsBalancer.incrMiscInvocations();
1437    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1438    if (!assignments.isEmpty()) {
1439      servers = new ArrayList<>(servers);
1440      // Guarantee not to put other regions on master
1441      servers.remove(masterServerName);
1442      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1443      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1444          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1445    }
1446    if (this.maintenanceMode || regions.isEmpty()) {
1447      return assignments;
1448    }
1449
1450    int numServers = servers == null ? 0 : servers.size();
1451    if (numServers == 0) {
1452      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1453      return Collections.emptyMap();
1454    }
1455    if (numServers == 1) { // Only one server, nothing fancy we can do here
1456      ServerName server = servers.get(0);
1457      assignments.put(server, new ArrayList<>(regions.keySet()));
1458      return assignments;
1459    }
1460
1461    // Group all of the old assignments by their hostname.
1462    // We can't group directly by ServerName since the servers all have
1463    // new start-codes.
1464
1465    // Group the servers by their hostname. It's possible we have multiple
1466    // servers on the same host on different ports.
1467    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1468    for (ServerName server : servers) {
1469      assignments.put(server, new ArrayList<>());
1470      serversByHostname.put(server.getHostnameLowerCase(), server);
1471    }
1472
1473    // Collection of the hostnames that used to have regions
1474    // assigned, but for which we no longer have any RS running
1475    // after the cluster restart.
1476    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1477
1478    // If the old servers aren't present, lets assign those regions later.
1479    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1480
1481    int numRandomAssignments = 0;
1482    int numRetainedAssigments = 0;
1483    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1484      RegionInfo region = entry.getKey();
1485      ServerName oldServerName = entry.getValue();
1486      List<ServerName> localServers = new ArrayList<>();
1487      if (oldServerName != null) {
1488        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1489      }
1490      if (localServers.isEmpty()) {
1491        // No servers on the new cluster match up with this hostname, assign randomly, later.
1492        randomAssignRegions.add(region);
1493        if (oldServerName != null) {
1494          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1495        }
1496      } else if (localServers.size() == 1) {
1497        // the usual case - one new server on same host
1498        ServerName target = localServers.get(0);
1499        assignments.get(target).add(region);
1500        numRetainedAssigments++;
1501      } else {
1502        // multiple new servers in the cluster on this same host
1503        if (localServers.contains(oldServerName)) {
1504          assignments.get(oldServerName).add(region);
1505          numRetainedAssigments++;
1506        } else {
1507          ServerName target = null;
1508          for (ServerName tmp : localServers) {
1509            if (tmp.getPort() == oldServerName.getPort()) {
1510              target = tmp;
1511              assignments.get(tmp).add(region);
1512              numRetainedAssigments++;
1513              break;
1514            }
1515          }
1516          if (target == null) {
1517            randomAssignRegions.add(region);
1518          }
1519        }
1520      }
1521    }
1522
1523    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1524    if (randomAssignRegions.size() > 0) {
1525      Cluster cluster = createCluster(servers, regions.keySet());
1526      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1527        ServerName sn = entry.getKey();
1528        for (RegionInfo region : entry.getValue()) {
1529          cluster.doAssignRegion(region, sn);
1530        }
1531      }
1532      for (RegionInfo region : randomAssignRegions) {
1533        ServerName target = randomAssignment(cluster, region, servers);
1534        assignments.get(target).add(region);
1535        numRandomAssignments++;
1536      }
1537    }
1538
1539    String randomAssignMsg = "";
1540    if (numRandomAssignments > 0) {
1541      randomAssignMsg =
1542          numRandomAssignments + " regions were assigned "
1543              + "to random hosts, since the old hosts for these regions are no "
1544              + "longer present in the cluster. These hosts were:\n  "
1545              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1546    }
1547
1548    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1549        + " retained the pre-restart assignment. " + randomAssignMsg);
1550    return assignments;
1551  }
1552
1553  @Override
1554  public void initialize() throws HBaseIOException{
1555  }
1556
1557  @Override
1558  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1559  }
1560
1561  @Override
1562  public void regionOffline(RegionInfo regionInfo) {
1563  }
1564
1565  @Override
1566  public boolean isStopped() {
1567    return stopped;
1568  }
1569
1570  @Override
1571  public void stop(String why) {
1572    LOG.info("Load Balancer stop requested: "+why);
1573    stopped = true;
1574  }
1575
1576  /**
1577  * Updates the balancer status tag reported to JMX
1578  */
1579  public void updateBalancerStatus(boolean status) {
1580    metricsBalancer.balancerStatus(status);
1581  }
1582
1583  /**
1584   * Used to assign a single region to a random server.
1585   */
1586  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1587      List<ServerName> servers) {
1588    int numServers = servers.size(); // servers is not null, numServers > 1
1589    ServerName sn = null;
1590    final int maxIterations = numServers * 4;
1591    int iterations = 0;
1592    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1593    do {
1594      int i = RANDOM.nextInt(numServers);
1595      sn = servers.get(i);
1596      if (!usedSNs.contains(sn)) {
1597        usedSNs.add(sn);
1598      }
1599    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1600        && iterations++ < maxIterations);
1601    if (iterations >= maxIterations) {
1602      // We have reached the max. Means the servers that we collected is still lowering the
1603      // availability
1604      for (ServerName unusedServer : servers) {
1605        if (!usedSNs.contains(unusedServer)) {
1606          // check if any other unused server is there for us to use.
1607          // If so use it. Else we have not other go but to go with one of them
1608          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1609            sn = unusedServer;
1610            break;
1611          }
1612        }
1613      }
1614    }
1615    cluster.doAssignRegion(regionInfo, sn);
1616    return sn;
1617  }
1618
1619  /**
1620   * Round robin a list of regions to a list of servers
1621   */
1622  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1623    List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) {
1624    List<RegionInfo> unassignedRegions = new ArrayList<>();
1625    int numServers = servers.size();
1626    int numRegions = regions.size();
1627    int max = (int) Math.ceil((float) numRegions / numServers);
1628    int serverIdx = 0;
1629    if (numServers > 1) {
1630      serverIdx = RANDOM.nextInt(numServers);
1631    }
1632    int regionIdx = 0;
1633    for (int j = 0; j < numServers; j++) {
1634      ServerName server = servers.get((j + serverIdx) % numServers);
1635      List<RegionInfo> serverRegions = new ArrayList<>(max);
1636      for (int i = regionIdx; i < numRegions; i += numServers) {
1637        RegionInfo region = regions.get(i % numRegions);
1638        if (cluster.wouldLowerAvailability(region, server)) {
1639          unassignedRegions.add(region);
1640        } else {
1641          serverRegions.add(region);
1642          cluster.doAssignRegion(region, server);
1643        }
1644      }
1645      assignments.put(server, serverRegions);
1646      regionIdx++;
1647    }
1648
1649
1650    List<RegionInfo> lastFewRegions = new ArrayList<>();
1651    // assign the remaining by going through the list and try to assign to servers one-by-one
1652    serverIdx = RANDOM.nextInt(numServers);
1653    OUTER : for (RegionInfo region : unassignedRegions) {
1654      boolean assigned = false;
1655      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1656        ServerName server = servers.get((j + serverIdx) % numServers);
1657        if (cluster.wouldLowerAvailability(region, server)) {
1658          continue INNER;
1659        } else {
1660          assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1661          cluster.doAssignRegion(region, server);
1662          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1663          assigned = true;
1664          break;
1665        }
1666      }
1667      if (!assigned) {
1668        lastFewRegions.add(region);
1669      }
1670    }
1671    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1672    // make it optimal later. we can end up with this if numReplicas > numServers.
1673    for (RegionInfo region : lastFewRegions) {
1674      int i = RANDOM.nextInt(numServers);
1675      ServerName server = servers.get(i);
1676      assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1677      cluster.doAssignRegion(region, server);
1678    }
1679  }
1680
1681  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1682    Collection<RegionInfo> regions) {
1683    if (this.services != null && this.services.getAssignmentManager() != null) {
1684      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1685    } else {
1686      return new HashMap<>();
1687    }
1688  }
1689
1690  private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
1691      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
1692    Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
1693    for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
1694      serverNameListMap.forEach((serverName, regionInfoList) -> {
1695        List<RegionInfo> regionInfos =
1696            returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
1697        regionInfos.addAll(regionInfoList);
1698      });
1699    }
1700    return returnMap;
1701  }
1702
1703  @Override
1704  public abstract List<RegionPlan> balanceTable(TableName tableName,
1705      Map<ServerName, List<RegionInfo>> loadOfOneTable);
1706
1707  @Override
1708  public List<RegionPlan>
1709      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
1710    if (isByTable) {
1711      List<RegionPlan> result = new ArrayList<>();
1712      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
1713        LOG.info("Start Generate Balance plan for table: " + tableName);
1714        List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
1715        if (partialPlans != null) {
1716          result.addAll(partialPlans);
1717        }
1718      });
1719      return result;
1720    } else {
1721      LOG.info("Start Generate Balance plan for cluster.");
1722      return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
1723    }
1724  }
1725
1726  @Override
1727  public void onConfigurationChange(Configuration conf) {
1728  }
1729}