001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import edu.umd.cs.findbugs.annotations.NonNull;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.Deque;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Map.Entry;
034import java.util.NavigableMap;
035import java.util.Random;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.function.Predicate;
039import java.util.stream.Collectors;
040import org.apache.commons.lang3.NotImplementedException;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseIOException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RackManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
058import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
059import org.apache.hadoop.hbase.net.Address;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
065import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
068
069/**
070 * The base class for load balancers. It provides the the functions used to by
071 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
072 * in the edge cases. It doesn't provide an implementation of the
073 * actual balancing algorithm.
074 *
075 */
076@InterfaceAudience.Private
077public abstract class BaseLoadBalancer implements LoadBalancer {
078
079  public static final String BALANCER_DECISION_BUFFER_ENABLED =
080    "hbase.master.balancer.decision.buffer.enabled";
081  public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
082
083  protected static final int MIN_SERVER_BALANCE = 2;
084  private volatile boolean stopped = false;
085
086  private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
087
088  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
089    = load -> load.getRegionMetrics().isEmpty();
090
091  protected RegionLocationFinder regionFinder;
092  protected boolean useRegionFinder;
093  protected boolean isByTable = false;
094
095  /**
096   * Use to add balancer decision history to ring-buffer
097   */
098  protected NamedQueueRecorder namedQueueRecorder;
099
100  private static class DefaultRackManager extends RackManager {
101    @Override
102    public String getRack(ServerName server) {
103      return UNKNOWN_RACK;
104    }
105  }
106
107  /**
108   * The constructor that uses the basic MetricsBalancer
109   */
110  protected BaseLoadBalancer() {
111    metricsBalancer = new MetricsBalancer();
112    createRegionFinder();
113  }
114
115  /**
116   * This Constructor accepts an instance of MetricsBalancer,
117   * which will be used instead of creating a new one
118   */
119  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
120    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
121    createRegionFinder();
122  }
123
124  private void createRegionFinder() {
125    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
126    if (useRegionFinder) {
127      regionFinder = new RegionLocationFinder();
128    }
129  }
130
131  /**
132   * An efficient array based implementation similar to ClusterState for keeping
133   * the status of the cluster in terms of region assignment and distribution.
134   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
135   * hundreds of thousands of hashmap manipulations are very costly, which is why this
136   * class uses mostly indexes and arrays.
137   *
138   * Cluster tracks a list of unassigned regions, region assignments, and the server
139   * topology in terms of server names, hostnames and racks.
140   */
141  protected static class Cluster {
142    ServerName[] servers;
143    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
144    String[] racks;
145    boolean multiServersPerHost = false; // whether or not any host has more than one server
146
147    ArrayList<String> tables;
148    RegionInfo[] regions;
149    Deque<BalancerRegionLoad>[] regionLoads;
150    private RegionLocationFinder regionFinder;
151
152    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
153
154    int[]   serverIndexToHostIndex;      //serverIndex -> host index
155    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
156
157    int[][] regionsPerServer;            //serverIndex -> region list
158    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
159    int[][] regionsPerHost;              //hostIndex -> list of regions
160    int[][] regionsPerRack;              //rackIndex -> region list
161    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
162    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
163    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
164
165    int[][] serversPerHost;              //hostIndex -> list of server indexes
166    int[][] serversPerRack;              //rackIndex -> list of server indexes
167    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
168    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
169    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
170    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
171    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
172    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
173    boolean hasRegionReplicas = false;   //whether there is regions with replicas
174
175    Integer[] serverIndicesSortedByRegionCount;
176    Integer[] serverIndicesSortedByLocality;
177
178    Map<Address, Integer> serversToIndex;
179    Map<String, Integer> hostsToIndex;
180    Map<String, Integer> racksToIndex;
181    Map<String, Integer> tablesToIndex;
182    Map<RegionInfo, Integer> regionsToIndex;
183    float[] localityPerServer;
184
185    int numServers;
186    int numHosts;
187    int numRacks;
188    int numTables;
189    int numRegions;
190
191    int numMovedRegions = 0; //num moved regions from the initial configuration
192    Map<ServerName, List<RegionInfo>> clusterState;
193
194    protected final RackManager rackManager;
195    // Maps region -> rackIndex -> locality of region on rack
196    private float[][] rackLocalities;
197    // Maps localityType -> region -> [server|rack]Index with highest locality
198    private int[][] regionsToMostLocalEntities;
199
200    protected Cluster(
201        Map<ServerName, List<RegionInfo>> clusterState,
202        Map<String, Deque<BalancerRegionLoad>> loads,
203        RegionLocationFinder regionFinder,
204        RackManager rackManager) {
205      this(null, clusterState, loads, regionFinder, rackManager);
206    }
207
208    @SuppressWarnings("unchecked")
209    protected Cluster(
210        Collection<RegionInfo> unassignedRegions,
211        Map<ServerName, List<RegionInfo>> clusterState,
212        Map<String, Deque<BalancerRegionLoad>> loads,
213        RegionLocationFinder regionFinder,
214        RackManager rackManager) {
215
216      if (unassignedRegions == null) {
217        unassignedRegions = EMPTY_REGION_LIST;
218      }
219
220      serversToIndex = new HashMap<>();
221      hostsToIndex = new HashMap<>();
222      racksToIndex = new HashMap<>();
223      tablesToIndex = new HashMap<>();
224
225      //TODO: We should get the list of tables from master
226      tables = new ArrayList<>();
227      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
228
229      numRegions = 0;
230
231      List<List<Integer>> serversPerHostList = new ArrayList<>();
232      List<List<Integer>> serversPerRackList = new ArrayList<>();
233      this.clusterState = clusterState;
234      this.regionFinder = regionFinder;
235
236      // Use servername and port as there can be dead servers in this list. We want everything with
237      // a matching hostname and port to have the same index.
238      for (ServerName sn : clusterState.keySet()) {
239        if (sn == null) {
240          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
241              "skipping; unassigned regions?");
242          if (LOG.isTraceEnabled()) {
243            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
244          }
245          continue;
246        }
247        if (serversToIndex.get(sn.getAddress()) == null) {
248          serversToIndex.put(sn.getAddress(), numServers++);
249        }
250        if (!hostsToIndex.containsKey(sn.getHostname())) {
251          hostsToIndex.put(sn.getHostname(), numHosts++);
252          serversPerHostList.add(new ArrayList<>(1));
253        }
254
255        int serverIndex = serversToIndex.get(sn.getAddress());
256        int hostIndex = hostsToIndex.get(sn.getHostname());
257        serversPerHostList.get(hostIndex).add(serverIndex);
258
259        String rack = this.rackManager.getRack(sn);
260        if (!racksToIndex.containsKey(rack)) {
261          racksToIndex.put(rack, numRacks++);
262          serversPerRackList.add(new ArrayList<>());
263        }
264        int rackIndex = racksToIndex.get(rack);
265        serversPerRackList.get(rackIndex).add(serverIndex);
266      }
267
268      // Count how many regions there are.
269      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
270        numRegions += entry.getValue().size();
271      }
272      numRegions += unassignedRegions.size();
273
274      regionsToIndex = new HashMap<>(numRegions);
275      servers = new ServerName[numServers];
276      serversPerHost = new int[numHosts][];
277      serversPerRack = new int[numRacks][];
278      regions = new RegionInfo[numRegions];
279      regionIndexToServerIndex = new int[numRegions];
280      initialRegionIndexToServerIndex = new int[numRegions];
281      regionIndexToTableIndex = new int[numRegions];
282      regionIndexToPrimaryIndex = new int[numRegions];
283      regionLoads = new Deque[numRegions];
284
285      regionLocations = new int[numRegions][];
286      serverIndicesSortedByRegionCount = new Integer[numServers];
287      serverIndicesSortedByLocality = new Integer[numServers];
288      localityPerServer = new float[numServers];
289
290      serverIndexToHostIndex = new int[numServers];
291      serverIndexToRackIndex = new int[numServers];
292      regionsPerServer = new int[numServers][];
293      serverIndexToRegionsOffset = new int[numServers];
294      regionsPerHost = new int[numHosts][];
295      regionsPerRack = new int[numRacks][];
296      primariesOfRegionsPerServer = new int[numServers][];
297      primariesOfRegionsPerHost = new int[numHosts][];
298      primariesOfRegionsPerRack = new int[numRacks][];
299
300      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
301
302      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
303        if (entry.getKey() == null) {
304          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
305          continue;
306        }
307        int serverIndex = serversToIndex.get(entry.getKey().getAddress());
308
309        // keep the servername if this is the first server name for this hostname
310        // or this servername has the newest startcode.
311        if (servers[serverIndex] == null ||
312            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
313          servers[serverIndex] = entry.getKey();
314        }
315
316        if (regionsPerServer[serverIndex] != null) {
317          // there is another server with the same hostAndPort in ClusterState.
318          // allocate the array for the total size
319          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
320        } else {
321          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
322        }
323        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
324        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
325        serverIndicesSortedByLocality[serverIndex] = serverIndex;
326      }
327
328      hosts = new String[numHosts];
329      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
330        hosts[entry.getValue()] = entry.getKey();
331      }
332      racks = new String[numRacks];
333      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
334        racks[entry.getValue()] = entry.getKey();
335      }
336
337      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
338        int serverIndex = serversToIndex.get(entry.getKey().getAddress());
339        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
340
341        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
342        serverIndexToHostIndex[serverIndex] = hostIndex;
343
344        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
345        serverIndexToRackIndex[serverIndex] = rackIndex;
346
347        for (RegionInfo region : entry.getValue()) {
348          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
349          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
350          regionIndex++;
351        }
352        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
353      }
354
355      for (RegionInfo region : unassignedRegions) {
356        registerRegion(region, regionIndex, -1, loads, regionFinder);
357        regionIndex++;
358      }
359
360      for (int i = 0; i < serversPerHostList.size(); i++) {
361        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
362        for (int j = 0; j < serversPerHost[i].length; j++) {
363          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
364        }
365        if (serversPerHost[i].length > 1) {
366          multiServersPerHost = true;
367        }
368      }
369
370      for (int i = 0; i < serversPerRackList.size(); i++) {
371        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
372        for (int j = 0; j < serversPerRack[i].length; j++) {
373          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
374        }
375      }
376
377      numTables = tables.size();
378      numRegionsPerServerPerTable = new int[numServers][numTables];
379
380      for (int i = 0; i < numServers; i++) {
381        for (int j = 0; j < numTables; j++) {
382          numRegionsPerServerPerTable[i][j] = 0;
383        }
384      }
385
386      for (int i=0; i < regionIndexToServerIndex.length; i++) {
387        if (regionIndexToServerIndex[i] >= 0) {
388          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
389        }
390      }
391
392      numMaxRegionsPerTable = new int[numTables];
393      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
394        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
395          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
396            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
397          }
398        }
399      }
400
401      for (int i = 0; i < regions.length; i ++) {
402        RegionInfo info = regions[i];
403        if (RegionReplicaUtil.isDefaultReplica(info)) {
404          regionIndexToPrimaryIndex[i] = i;
405        } else {
406          hasRegionReplicas = true;
407          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
408          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
409        }
410      }
411
412      for (int i = 0; i < regionsPerServer.length; i++) {
413        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
414        for (int j = 0; j < regionsPerServer[i].length; j++) {
415          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
416          primariesOfRegionsPerServer[i][j] = primaryIndex;
417        }
418        // sort the regions by primaries.
419        Arrays.sort(primariesOfRegionsPerServer[i]);
420      }
421
422      // compute regionsPerHost
423      if (multiServersPerHost) {
424        for (int i = 0 ; i < serversPerHost.length; i++) {
425          int numRegionsPerHost = 0;
426          for (int j = 0; j < serversPerHost[i].length; j++) {
427            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
428          }
429          regionsPerHost[i] = new int[numRegionsPerHost];
430          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
431        }
432        for (int i = 0 ; i < serversPerHost.length; i++) {
433          int numRegionPerHostIndex = 0;
434          for (int j = 0; j < serversPerHost[i].length; j++) {
435            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
436              int region = regionsPerServer[serversPerHost[i][j]][k];
437              regionsPerHost[i][numRegionPerHostIndex] = region;
438              int primaryIndex = regionIndexToPrimaryIndex[region];
439              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
440              numRegionPerHostIndex++;
441            }
442          }
443          // sort the regions by primaries.
444          Arrays.sort(primariesOfRegionsPerHost[i]);
445        }
446      }
447
448      // compute regionsPerRack
449      if (numRacks > 1) {
450        for (int i = 0 ; i < serversPerRack.length; i++) {
451          int numRegionsPerRack = 0;
452          for (int j = 0; j < serversPerRack[i].length; j++) {
453            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
454          }
455          regionsPerRack[i] = new int[numRegionsPerRack];
456          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
457        }
458
459        for (int i = 0 ; i < serversPerRack.length; i++) {
460          int numRegionPerRackIndex = 0;
461          for (int j = 0; j < serversPerRack[i].length; j++) {
462            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
463              int region = regionsPerServer[serversPerRack[i][j]][k];
464              regionsPerRack[i][numRegionPerRackIndex] = region;
465              int primaryIndex = regionIndexToPrimaryIndex[region];
466              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
467              numRegionPerRackIndex++;
468            }
469          }
470          // sort the regions by primaries.
471          Arrays.sort(primariesOfRegionsPerRack[i]);
472        }
473      }
474    }
475
476    /** Helper for Cluster constructor to handle a region */
477    private void registerRegion(RegionInfo region, int regionIndex,
478        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
479        RegionLocationFinder regionFinder) {
480      String tableName = region.getTable().getNameAsString();
481      if (!tablesToIndex.containsKey(tableName)) {
482        tables.add(tableName);
483        tablesToIndex.put(tableName, tablesToIndex.size());
484      }
485      int tableIndex = tablesToIndex.get(tableName);
486
487      regionsToIndex.put(region, regionIndex);
488      regions[regionIndex] = region;
489      regionIndexToServerIndex[regionIndex] = serverIndex;
490      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
491      regionIndexToTableIndex[regionIndex] = tableIndex;
492
493      // region load
494      if (loads != null) {
495        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
496        // That could have failed if the RegionLoad is using the other regionName
497        if (rl == null) {
498          // Try getting the region load using encoded name.
499          rl = loads.get(region.getEncodedName());
500        }
501        regionLoads[regionIndex] = rl;
502      }
503
504      if (regionFinder != null) {
505        // region location
506        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
507        regionLocations[regionIndex] = new int[loc.size()];
508        for (int i = 0; i < loc.size(); i++) {
509          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
510              : (serversToIndex.get(loc.get(i).getAddress()) == null ? -1
511                  : serversToIndex.get(loc.get(i).getAddress()));
512        }
513      }
514    }
515
516    /**
517     * Returns true iff a given server has less regions than the balanced amount
518     */
519    public boolean serverHasTooFewRegions(int server) {
520      int minLoad = this.numRegions / numServers;
521      int numRegions = getNumRegions(server);
522      return numRegions < minLoad;
523    }
524
525    /**
526     * Retrieves and lazily initializes a field storing the locality of
527     * every region/server combination
528     */
529    public float[][] getOrComputeRackLocalities() {
530      if (rackLocalities == null || regionsToMostLocalEntities == null) {
531        computeCachedLocalities();
532      }
533      return rackLocalities;
534    }
535
536    /**
537     * Lazily initializes and retrieves a mapping of region -> server for which region has
538     * the highest the locality
539     */
540    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
541      if (rackLocalities == null || regionsToMostLocalEntities == null) {
542        computeCachedLocalities();
543      }
544      return regionsToMostLocalEntities[type.ordinal()];
545    }
546
547    /**
548     * Looks up locality from cache of localities. Will create cache if it does
549     * not already exist.
550     */
551    public float getOrComputeLocality(int region, int entity, LocalityType type) {
552      switch (type) {
553        case SERVER:
554          return getLocalityOfRegion(region, entity);
555        case RACK:
556          return getOrComputeRackLocalities()[region][entity];
557        default:
558          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
559      }
560    }
561
562    /**
563     * Returns locality weighted by region size in MB. Will create locality cache
564     * if it does not already exist.
565     */
566    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
567      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
568    }
569
570    /**
571     * Returns the size in MB from the most recent RegionLoad for region
572     */
573    public int getRegionSizeMB(int region) {
574      Deque<BalancerRegionLoad> load = regionLoads[region];
575      // This means regions have no actual data on disk
576      if (load == null) {
577        return 0;
578      }
579      return regionLoads[region].getLast().getStorefileSizeMB();
580    }
581
582    /**
583     * Computes and caches the locality for each region/rack combinations,
584     * as well as storing a mapping of region -> server and region -> rack such that server
585     * and rack have the highest locality for region
586     */
587    private void computeCachedLocalities() {
588      rackLocalities = new float[numRegions][numRacks];
589      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
590
591      // Compute localities and find most local server per region
592      for (int region = 0; region < numRegions; region++) {
593        int serverWithBestLocality = 0;
594        float bestLocalityForRegion = 0;
595        for (int server = 0; server < numServers; server++) {
596          // Aggregate per-rack locality
597          float locality = getLocalityOfRegion(region, server);
598          int rack = serverIndexToRackIndex[server];
599          int numServersInRack = serversPerRack[rack].length;
600          rackLocalities[region][rack] += locality / numServersInRack;
601
602          if (locality > bestLocalityForRegion) {
603            serverWithBestLocality = server;
604            bestLocalityForRegion = locality;
605          }
606        }
607        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
608
609        // Find most local rack per region
610        int rackWithBestLocality = 0;
611        float bestRackLocalityForRegion = 0.0f;
612        for (int rack = 0; rack < numRacks; rack++) {
613          float rackLocality = rackLocalities[region][rack];
614          if (rackLocality > bestRackLocalityForRegion) {
615            bestRackLocalityForRegion = rackLocality;
616            rackWithBestLocality = rack;
617          }
618        }
619        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
620      }
621
622    }
623
624    /**
625     * Maps region index to rack index
626     */
627    public int getRackForRegion(int region) {
628      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
629    }
630
631    enum LocalityType {
632      SERVER,
633      RACK
634    }
635
636    /** An action to move or swap a region */
637    public static class Action {
638      public enum Type {
639        ASSIGN_REGION,
640        MOVE_REGION,
641        SWAP_REGIONS,
642        NULL,
643      }
644
645      public Type type;
646      public Action (Type type) {this.type = type;}
647      /** Returns an Action which would undo this action */
648      public Action undoAction() { return this; }
649      @Override
650      public String toString() { return type + ":";}
651    }
652
653    public static class AssignRegionAction extends Action {
654      public int region;
655      public int server;
656      public AssignRegionAction(int region, int server) {
657        super(Type.ASSIGN_REGION);
658        this.region = region;
659        this.server = server;
660      }
661      @Override
662      public Action undoAction() {
663        // TODO implement this. This action is not being used by the StochasticLB for now
664        // in case it uses it, we should implement this function.
665        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
666      }
667      @Override
668      public String toString() {
669        return type + ": " + region + ":" + server;
670      }
671    }
672
673    public static class MoveRegionAction extends Action {
674      public int region;
675      public int fromServer;
676      public int toServer;
677
678      public MoveRegionAction(int region, int fromServer, int toServer) {
679        super(Type.MOVE_REGION);
680        this.fromServer = fromServer;
681        this.region = region;
682        this.toServer = toServer;
683      }
684      @Override
685      public Action undoAction() {
686        return new MoveRegionAction (region, toServer, fromServer);
687      }
688      @Override
689      public String toString() {
690        return type + ": " + region + ":" + fromServer + " -> " + toServer;
691      }
692    }
693
694    public static class SwapRegionsAction extends Action {
695      public int fromServer;
696      public int fromRegion;
697      public int toServer;
698      public int toRegion;
699      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
700        super(Type.SWAP_REGIONS);
701        this.fromServer = fromServer;
702        this.fromRegion = fromRegion;
703        this.toServer = toServer;
704        this.toRegion = toRegion;
705      }
706      @Override
707      public Action undoAction() {
708        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
709      }
710      @Override
711      public String toString() {
712        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
713      }
714    }
715
716    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
717        justification="Mistake. Too disruptive to change now")
718    public static final Action NullAction = new Action(Type.NULL);
719
720    public void doAction(Action action) {
721      switch (action.type) {
722      case NULL: break;
723      case ASSIGN_REGION:
724        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
725        assert action instanceof AssignRegionAction: action.getClass();
726        AssignRegionAction ar = (AssignRegionAction) action;
727        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
728        regionMoved(ar.region, -1, ar.server);
729        break;
730      case MOVE_REGION:
731        assert action instanceof MoveRegionAction: action.getClass();
732        MoveRegionAction mra = (MoveRegionAction) action;
733        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
734        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
735        regionMoved(mra.region, mra.fromServer, mra.toServer);
736        break;
737      case SWAP_REGIONS:
738        assert action instanceof SwapRegionsAction: action.getClass();
739        SwapRegionsAction a = (SwapRegionsAction) action;
740        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
741        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
742        regionMoved(a.fromRegion, a.fromServer, a.toServer);
743        regionMoved(a.toRegion, a.toServer, a.fromServer);
744        break;
745      default:
746        throw new RuntimeException("Uknown action:" + action.type);
747      }
748    }
749
750    /**
751     * Return true if the placement of region on server would lower the availability
752     * of the region in question
753     * @return true or false
754     */
755    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
756      if (!serversToIndex.containsKey(serverName.getAddress())) {
757        return false; // safeguard against race between cluster.servers and servers from LB method args
758      }
759      int server = serversToIndex.get(serverName.getAddress());
760      int region = regionsToIndex.get(regionInfo);
761
762      // Region replicas for same region should better assign to different servers
763      for (int i : regionsPerServer[server]) {
764        RegionInfo otherRegionInfo = regions[i];
765        if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
766          return true;
767        }
768      }
769
770      int primary = regionIndexToPrimaryIndex[region];
771      if (primary == -1) {
772        return false;
773      }
774      // there is a subset relation for server < host < rack
775      // check server first
776      if (contains(primariesOfRegionsPerServer[server], primary)) {
777        // check for whether there are other servers that we can place this region
778        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
779          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
780            return true; // meaning there is a better server
781          }
782        }
783        return false; // there is not a better server to place this
784      }
785
786      // check host
787      if (multiServersPerHost) {
788        // these arrays would only be allocated if we have more than one server per host
789        int host = serverIndexToHostIndex[server];
790        if (contains(primariesOfRegionsPerHost[host], primary)) {
791          // check for whether there are other hosts that we can place this region
792          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
793            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
794              return true; // meaning there is a better host
795            }
796          }
797          return false; // there is not a better host to place this
798        }
799      }
800
801      // check rack
802      if (numRacks > 1) {
803        int rack = serverIndexToRackIndex[server];
804        if (contains(primariesOfRegionsPerRack[rack], primary)) {
805          // check for whether there are other racks that we can place this region
806          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
807            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
808              return true; // meaning there is a better rack
809            }
810          }
811          return false; // there is not a better rack to place this
812        }
813      }
814
815      return false;
816    }
817
818    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
819      if (!serversToIndex.containsKey(serverName.getAddress())) {
820        return;
821      }
822      int server = serversToIndex.get(serverName.getAddress());
823      int region = regionsToIndex.get(regionInfo);
824      doAction(new AssignRegionAction(region, server));
825    }
826
827    void regionMoved(int region, int oldServer, int newServer) {
828      regionIndexToServerIndex[region] = newServer;
829      if (initialRegionIndexToServerIndex[region] == newServer) {
830        numMovedRegions--; //region moved back to original location
831      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
832        numMovedRegions++; //region moved from original location
833      }
834      int tableIndex = regionIndexToTableIndex[region];
835      if (oldServer >= 0) {
836        numRegionsPerServerPerTable[oldServer][tableIndex]--;
837      }
838      numRegionsPerServerPerTable[newServer][tableIndex]++;
839
840      //check whether this caused maxRegionsPerTable in the new Server to be updated
841      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
842        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
843      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
844          == numMaxRegionsPerTable[tableIndex]) {
845        //recompute maxRegionsPerTable since the previous value was coming from the old server
846        numMaxRegionsPerTable[tableIndex] = 0;
847        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
848          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
849            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
850          }
851        }
852      }
853
854      // update for servers
855      int primary = regionIndexToPrimaryIndex[region];
856      if (oldServer >= 0) {
857        primariesOfRegionsPerServer[oldServer] = removeRegion(
858          primariesOfRegionsPerServer[oldServer], primary);
859      }
860      primariesOfRegionsPerServer[newServer] = addRegionSorted(
861        primariesOfRegionsPerServer[newServer], primary);
862
863      // update for hosts
864      if (multiServersPerHost) {
865        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
866        int newHost = serverIndexToHostIndex[newServer];
867        if (newHost != oldHost) {
868          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
869          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
870          if (oldHost >= 0) {
871            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
872            primariesOfRegionsPerHost[oldHost] = removeRegion(
873              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
874          }
875        }
876      }
877
878      // update for racks
879      if (numRacks > 1) {
880        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
881        int newRack = serverIndexToRackIndex[newServer];
882        if (newRack != oldRack) {
883          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
884          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
885          if (oldRack >= 0) {
886            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
887            primariesOfRegionsPerRack[oldRack] = removeRegion(
888              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
889          }
890        }
891      }
892    }
893
894    int[] removeRegion(int[] regions, int regionIndex) {
895      //TODO: this maybe costly. Consider using linked lists
896      int[] newRegions = new int[regions.length - 1];
897      int i = 0;
898      for (i = 0; i < regions.length; i++) {
899        if (regions[i] == regionIndex) {
900          break;
901        }
902        newRegions[i] = regions[i];
903      }
904      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
905      return newRegions;
906    }
907
908    int[] addRegion(int[] regions, int regionIndex) {
909      int[] newRegions = new int[regions.length + 1];
910      System.arraycopy(regions, 0, newRegions, 0, regions.length);
911      newRegions[newRegions.length - 1] = regionIndex;
912      return newRegions;
913    }
914
915    int[] addRegionSorted(int[] regions, int regionIndex) {
916      int[] newRegions = new int[regions.length + 1];
917      int i = 0;
918      for (i = 0; i < regions.length; i++) { // find the index to insert
919        if (regions[i] > regionIndex) {
920          break;
921        }
922      }
923      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
924      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
925      newRegions[i] = regionIndex;
926
927      return newRegions;
928    }
929
930    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
931      int i = 0;
932      for (i = 0; i < regions.length; i++) {
933        if (regions[i] == regionIndex) {
934          regions[i] = newRegionIndex;
935          break;
936        }
937      }
938      return regions;
939    }
940
941    void sortServersByRegionCount() {
942      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
943    }
944
945    int getNumRegions(int server) {
946      return regionsPerServer[server].length;
947    }
948
949    boolean contains(int[] arr, int val) {
950      return Arrays.binarySearch(arr, val) >= 0;
951    }
952
953    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
954
955    int getLowestLocalityRegionOnServer(int serverIndex) {
956      if (regionFinder != null) {
957        float lowestLocality = 1.0f;
958        int lowestLocalityRegionIndex = -1;
959        if (regionsPerServer[serverIndex].length == 0) {
960          // No regions on that region server
961          return -1;
962        }
963        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
964          int regionIndex = regionsPerServer[serverIndex][j];
965          HDFSBlocksDistribution distribution = regionFinder
966              .getBlockDistribution(regions[regionIndex]);
967          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
968          // skip empty region
969          if (distribution.getUniqueBlocksTotalWeight() == 0) {
970            continue;
971          }
972          if (locality < lowestLocality) {
973            lowestLocality = locality;
974            lowestLocalityRegionIndex = j;
975          }
976        }
977        if (lowestLocalityRegionIndex == -1) {
978          return -1;
979        }
980        if (LOG.isTraceEnabled()) {
981          LOG.trace("Lowest locality region is "
982              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
983                  .getRegionNameAsString() + " with locality " + lowestLocality
984              + " and its region server contains " + regionsPerServer[serverIndex].length
985              + " regions");
986        }
987        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
988      } else {
989        return -1;
990      }
991    }
992
993    float getLocalityOfRegion(int region, int server) {
994      if (regionFinder != null) {
995        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
996        return distribution.getBlockLocalityIndex(servers[server].getHostname());
997      } else {
998        return 0f;
999      }
1000    }
1001
1002    protected void setNumRegions(int numRegions) {
1003      this.numRegions = numRegions;
1004    }
1005
1006    protected void setNumMovedRegions(int numMovedRegions) {
1007      this.numMovedRegions = numMovedRegions;
1008    }
1009
1010    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
1011        justification="Not important but should be fixed")
1012    @Override
1013    public String toString() {
1014      StringBuilder desc = new StringBuilder("Cluster={servers=[");
1015      for(ServerName sn:servers) {
1016        desc.append(sn.getAddress().toString()).append(", ");
1017      }
1018      desc.append("], serverIndicesSortedByRegionCount=")
1019          .append(Arrays.toString(serverIndicesSortedByRegionCount))
1020          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
1021
1022      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
1023          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1024          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1025          .append(numMovedRegions).append('}');
1026      return desc.toString();
1027    }
1028  }
1029
1030  // slop for regions
1031  protected float slop;
1032  // overallSlop to control simpleLoadBalancer's cluster level threshold
1033  protected float overallSlop;
1034  protected Configuration config = HBaseConfiguration.create();
1035  protected RackManager rackManager;
1036  private static final Random RANDOM = new Random(System.currentTimeMillis());
1037  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1038  protected MetricsBalancer metricsBalancer = null;
1039  protected ClusterMetrics clusterStatus = null;
1040  protected ServerName masterServerName;
1041  protected MasterServices services;
1042
1043  /**
1044   * @deprecated since 2.4.0, will be removed in 3.0.0.
1045   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1046   */
1047  @Deprecated
1048  protected boolean onlySystemTablesOnMaster;
1049
1050  protected boolean maintenanceMode;
1051
1052  @Override
1053  public void setConf(Configuration conf) {
1054    this.config = conf;
1055    setSlop(conf);
1056    if (slop < 0) slop = 0;
1057    else if (slop > 1) slop = 1;
1058
1059    if (overallSlop < 0) overallSlop = 0;
1060    else if (overallSlop > 1) overallSlop = 1;
1061
1062    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1063
1064    this.rackManager = new RackManager(getConf());
1065    if (useRegionFinder) {
1066      regionFinder.setConf(conf);
1067    }
1068    this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
1069    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1070    LOG.info("slop={}, systemTablesOnMaster={}",
1071        this.slop, this.onlySystemTablesOnMaster);
1072  }
1073
1074  protected void setSlop(Configuration conf) {
1075    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1076    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1077  }
1078
1079  /**
1080   * Check if a region belongs to some system table.
1081   * If so, the primary replica may be expected to be put on the master regionserver.
1082   *
1083   * @deprecated since 2.4.0, will be removed in 3.0.0.
1084   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1085   */
1086  @Deprecated
1087  public boolean shouldBeOnMaster(RegionInfo region) {
1088    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1089        && region.getTable().isSystemTable();
1090  }
1091
1092  /**
1093   * Balance the regions that should be on master regionserver.
1094   *
1095   * @deprecated since 2.4.0, will be removed in 3.0.0.
1096   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1097   */
1098  @Deprecated
1099  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1100    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1101    List<RegionPlan> plans = null;
1102    List<RegionInfo> regions = clusterMap.get(masterServerName);
1103    if (regions != null) {
1104      Iterator<ServerName> keyIt = null;
1105      for (RegionInfo region: regions) {
1106        if (shouldBeOnMaster(region)) continue;
1107
1108        // Find a non-master regionserver to host the region
1109        if (keyIt == null || !keyIt.hasNext()) {
1110          keyIt = clusterMap.keySet().iterator();
1111        }
1112        ServerName dest = keyIt.next();
1113        if (masterServerName.equals(dest)) {
1114          if (!keyIt.hasNext()) {
1115            keyIt = clusterMap.keySet().iterator();
1116          }
1117          dest = keyIt.next();
1118        }
1119
1120        // Move this region away from the master regionserver
1121        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1122        if (plans == null) {
1123          plans = new ArrayList<>();
1124        }
1125        plans.add(plan);
1126      }
1127    }
1128    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1129      if (masterServerName.equals(server.getKey())) continue;
1130      for (RegionInfo region: server.getValue()) {
1131        if (!shouldBeOnMaster(region)) continue;
1132
1133        // Move this region to the master regionserver
1134        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1135        if (plans == null) {
1136          plans = new ArrayList<>();
1137        }
1138        plans.add(plan);
1139      }
1140    }
1141    return plans;
1142  }
1143
1144  /**
1145   * If master is configured to carry system tables only, in here is
1146   * where we figure what to assign it.
1147   *
1148   * @deprecated since 2.4.0, will be removed in 3.0.0.
1149   * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a>
1150   */
1151  @Deprecated
1152  @NonNull
1153  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1154      Collection<RegionInfo> regions, List<ServerName> servers) {
1155    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1156    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1157      if (masterServerName != null && servers.contains(masterServerName)) {
1158        assignments.put(masterServerName, new ArrayList<>());
1159        for (RegionInfo region : regions) {
1160          if (shouldBeOnMaster(region)) {
1161            assignments.get(masterServerName).add(region);
1162          }
1163        }
1164      }
1165    }
1166    return assignments;
1167  }
1168
1169  @Override
1170  public Configuration getConf() {
1171    return this.config;
1172  }
1173
1174  @Override
1175  public synchronized void setClusterMetrics(ClusterMetrics st) {
1176    this.clusterStatus = st;
1177    if (useRegionFinder) {
1178      regionFinder.setClusterMetrics(st);
1179    }
1180  }
1181
1182
1183  @Override
1184  public void setMasterServices(MasterServices masterServices) {
1185    masterServerName = masterServices.getServerName();
1186    this.services = masterServices;
1187    if (useRegionFinder) {
1188      this.regionFinder.setServices(masterServices);
1189    }
1190    if (this.services.isInMaintenanceMode()) {
1191      this.maintenanceMode = true;
1192    }
1193  }
1194
1195  @Override
1196  public void postMasterStartupInitialize() {
1197    if (services != null && regionFinder != null) {
1198      try {
1199        Set<RegionInfo> regions =
1200            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1201        regionFinder.refreshAndWait(regions);
1202      } catch (Exception e) {
1203        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1204      }
1205    }
1206  }
1207
1208  public void setRackManager(RackManager rackManager) {
1209    this.rackManager = rackManager;
1210  }
1211
1212  protected boolean needsBalance(TableName tableName, Cluster c) {
1213    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1214    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1215      if (LOG.isDebugEnabled()) {
1216        LOG.debug("Not running balancer because only " + cs.getNumServers()
1217            + " active regionserver(s)");
1218      }
1219      return false;
1220    }
1221    if(areSomeRegionReplicasColocated(c)) return true;
1222    if(idleRegionServerExist(c)) {
1223      return true;
1224    }
1225
1226    // Check if we even need to do any load balancing
1227    // HBASE-3681 check sloppiness first
1228    float average = cs.getLoadAverage(); // for logging
1229    int floor = (int) Math.floor(average * (1 - slop));
1230    int ceiling = (int) Math.ceil(average * (1 + slop));
1231    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1232      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1233      if (LOG.isTraceEnabled()) {
1234        // If nothing to balance, then don't say anything unless trace-level logging.
1235        LOG.trace("Skipping load balancing because balanced cluster; " +
1236          "servers=" + cs.getNumServers() +
1237          " regions=" + cs.getNumRegions() + " average=" + average +
1238          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1239          " leastloaded=" + serversByLoad.firstKey().getLoad());
1240      }
1241      return false;
1242    }
1243    return true;
1244  }
1245
1246  /**
1247   * Subclasses should implement this to return true if the cluster has nodes that hosts
1248   * multiple replicas for the same region, or, if there are multiple racks and the same
1249   * rack hosts replicas of the same region
1250   * @param c Cluster information
1251   * @return whether region replicas are currently co-located
1252   */
1253  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1254    return false;
1255  }
1256
1257  protected final boolean idleRegionServerExist(Cluster c){
1258    boolean isServerExistsWithMoreRegions = false;
1259    boolean isServerExistsWithZeroRegions = false;
1260    for (int[] serverList: c.regionsPerServer){
1261      if (serverList.length > 1) {
1262        isServerExistsWithMoreRegions = true;
1263      }
1264      if (serverList.length == 0) {
1265        isServerExistsWithZeroRegions = true;
1266      }
1267    }
1268    return isServerExistsWithMoreRegions && isServerExistsWithZeroRegions;
1269  }
1270
1271  /**
1272   * Generates a bulk assignment plan to be used on cluster startup using a
1273   * simple round-robin assignment.
1274   * <p>
1275   * Takes a list of all the regions and all the servers in the cluster and
1276   * returns a map of each server to the regions that it should be assigned.
1277   * <p>
1278   * Currently implemented as a round-robin assignment. Same invariant as load
1279   * balancing, all servers holding floor(avg) or ceiling(avg).
1280   *
1281   * TODO: Use block locations from HDFS to place regions with their blocks
1282   *
1283   * @param regions all regions
1284   * @param servers all servers
1285   * @return map of server to the regions it should take, or emptyMap if no
1286   *         assignment is possible (ie. no servers)
1287   */
1288  @Override
1289  @NonNull
1290  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1291      List<ServerName> servers) throws HBaseIOException {
1292    metricsBalancer.incrMiscInvocations();
1293    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1294    if (!assignments.isEmpty()) {
1295      servers = new ArrayList<>(servers);
1296      // Guarantee not to put other regions on master
1297      servers.remove(masterServerName);
1298      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1299      if (!masterRegions.isEmpty()) {
1300        regions = new ArrayList<>(regions);
1301        regions.removeAll(masterRegions);
1302      }
1303    }
1304    /**
1305     * only need assign system table
1306     */
1307    if (this.maintenanceMode || regions.isEmpty()) {
1308      return assignments;
1309    }
1310
1311    int numServers = servers == null ? 0 : servers.size();
1312    if (numServers == 0) {
1313      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1314      return Collections.emptyMap();
1315    }
1316
1317    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1318    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1319    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1320    // and balanced. This should also run fast with fewer number of iterations.
1321
1322    if (numServers == 1) { // Only one server, nothing fancy we can do here
1323      ServerName server = servers.get(0);
1324      assignments.put(server, new ArrayList<>(regions));
1325      return assignments;
1326    }
1327
1328    Cluster cluster = createCluster(servers, regions);
1329    roundRobinAssignment(cluster, regions, servers, assignments);
1330    return assignments;
1331  }
1332
1333  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
1334      throws HBaseIOException {
1335    boolean hasRegionReplica = false;
1336    try {
1337      if (services != null && services.getTableDescriptors() != null) {
1338        Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
1339        for (RegionInfo regionInfo : regions) {
1340          TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
1341          if (td != null && td.getRegionReplication() > 1) {
1342            hasRegionReplica = true;
1343            break;
1344          }
1345        }
1346      }
1347    } catch (IOException ioe) {
1348      throw new HBaseIOException(ioe);
1349    }
1350
1351    // Get the snapshot of the current assignments for the regions in question, and then create
1352    // a cluster out of it. Note that we might have replicas already assigned to some servers
1353    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1354    // replicas of the regions that are passed (for performance).
1355    Map<ServerName, List<RegionInfo>> clusterState = null;
1356    if (!hasRegionReplica) {
1357      clusterState = getRegionAssignmentsByServer(regions);
1358    } else {
1359      // for the case where we have region replica it is better we get the entire cluster's snapshot
1360      clusterState = getRegionAssignmentsByServer(null);
1361    }
1362
1363    for (ServerName server : servers) {
1364      if (!clusterState.containsKey(server)) {
1365        clusterState.put(server, EMPTY_REGION_LIST);
1366      }
1367    }
1368    return new Cluster(regions, clusterState, null, this.regionFinder,
1369        rackManager);
1370  }
1371
1372  private List<ServerName> findIdleServers(List<ServerName> servers) {
1373    return this.services.getServerManager()
1374            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1375  }
1376
1377  /**
1378   * Used to assign a single region to a random server.
1379   */
1380  @Override
1381  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1382      throws HBaseIOException {
1383    metricsBalancer.incrMiscInvocations();
1384    if (servers != null && servers.contains(masterServerName)) {
1385      if (shouldBeOnMaster(regionInfo)) {
1386        return masterServerName;
1387      }
1388      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1389        // Guarantee we do not put any regions on master
1390        servers = new ArrayList<>(servers);
1391        servers.remove(masterServerName);
1392      }
1393    }
1394
1395    int numServers = servers == null ? 0 : servers.size();
1396    if (numServers == 0) {
1397      LOG.warn("Wanted to retain assignment but no servers to assign to");
1398      return null;
1399    }
1400    if (numServers == 1) { // Only one server, nothing fancy we can do here
1401      return servers.get(0);
1402    }
1403    List<ServerName> idleServers = findIdleServers(servers);
1404    if (idleServers.size() == 1) {
1405      return idleServers.get(0);
1406    }
1407    final List<ServerName> finalServers = idleServers.isEmpty() ?
1408            servers : idleServers;
1409    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1410    Cluster cluster = createCluster(finalServers, regions);
1411    return randomAssignment(cluster, regionInfo, finalServers);
1412  }
1413
1414  /**
1415   * Generates a bulk assignment startup plan, attempting to reuse the existing
1416   * assignment information from META, but adjusting for the specified list of
1417   * available/online servers available for assignment.
1418   * <p>
1419   * Takes a map of all regions to their existing assignment from META. Also
1420   * takes a list of online servers for regions to be assigned to. Attempts to
1421   * retain all assignment, so in some instances initial assignment will not be
1422   * completely balanced.
1423   * <p>
1424   * Any leftover regions without an existing server to be assigned to will be
1425   * assigned randomly to available servers.
1426   *
1427   * @param regions regions and existing assignment from meta
1428   * @param servers available servers
1429   * @return map of servers and regions to be assigned to them, or emptyMap if no
1430   *           assignment is possible (ie. no servers)
1431   */
1432  @Override
1433  @NonNull
1434  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1435      List<ServerName> servers) throws HBaseIOException {
1436    // Update metrics
1437    metricsBalancer.incrMiscInvocations();
1438    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1439    if (!assignments.isEmpty()) {
1440      servers = new ArrayList<>(servers);
1441      // Guarantee not to put other regions on master
1442      servers.remove(masterServerName);
1443      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1444      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1445          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1446    }
1447    if (this.maintenanceMode || regions.isEmpty()) {
1448      return assignments;
1449    }
1450
1451    int numServers = servers == null ? 0 : servers.size();
1452    if (numServers == 0) {
1453      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1454      return Collections.emptyMap();
1455    }
1456    if (numServers == 1) { // Only one server, nothing fancy we can do here
1457      ServerName server = servers.get(0);
1458      assignments.put(server, new ArrayList<>(regions.keySet()));
1459      return assignments;
1460    }
1461
1462    // Group all of the old assignments by their hostname.
1463    // We can't group directly by ServerName since the servers all have
1464    // new start-codes.
1465
1466    // Group the servers by their hostname. It's possible we have multiple
1467    // servers on the same host on different ports.
1468    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1469    for (ServerName server : servers) {
1470      assignments.put(server, new ArrayList<>());
1471      serversByHostname.put(server.getHostnameLowerCase(), server);
1472    }
1473
1474    // Collection of the hostnames that used to have regions
1475    // assigned, but for which we no longer have any RS running
1476    // after the cluster restart.
1477    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1478
1479    // If the old servers aren't present, lets assign those regions later.
1480    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1481
1482    int numRandomAssignments = 0;
1483    int numRetainedAssigments = 0;
1484    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1485      RegionInfo region = entry.getKey();
1486      ServerName oldServerName = entry.getValue();
1487      List<ServerName> localServers = new ArrayList<>();
1488      if (oldServerName != null) {
1489        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1490      }
1491      if (localServers.isEmpty()) {
1492        // No servers on the new cluster match up with this hostname, assign randomly, later.
1493        randomAssignRegions.add(region);
1494        if (oldServerName != null) {
1495          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1496        }
1497      } else if (localServers.size() == 1) {
1498        // the usual case - one new server on same host
1499        ServerName target = localServers.get(0);
1500        assignments.get(target).add(region);
1501        numRetainedAssigments++;
1502      } else {
1503        // multiple new servers in the cluster on this same host
1504        if (localServers.contains(oldServerName)) {
1505          assignments.get(oldServerName).add(region);
1506          numRetainedAssigments++;
1507        } else {
1508          ServerName target = null;
1509          for (ServerName tmp : localServers) {
1510            if (tmp.getPort() == oldServerName.getPort()) {
1511              target = tmp;
1512              assignments.get(tmp).add(region);
1513              numRetainedAssigments++;
1514              break;
1515            }
1516          }
1517          if (target == null) {
1518            randomAssignRegions.add(region);
1519          }
1520        }
1521      }
1522    }
1523
1524    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1525    if (randomAssignRegions.size() > 0) {
1526      Cluster cluster = createCluster(servers, regions.keySet());
1527      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1528        ServerName sn = entry.getKey();
1529        for (RegionInfo region : entry.getValue()) {
1530          cluster.doAssignRegion(region, sn);
1531        }
1532      }
1533      for (RegionInfo region : randomAssignRegions) {
1534        ServerName target = randomAssignment(cluster, region, servers);
1535        assignments.get(target).add(region);
1536        numRandomAssignments++;
1537      }
1538    }
1539
1540    String randomAssignMsg = "";
1541    if (numRandomAssignments > 0) {
1542      randomAssignMsg =
1543          numRandomAssignments + " regions were assigned "
1544              + "to random hosts, since the old hosts for these regions are no "
1545              + "longer present in the cluster. These hosts were:\n  "
1546              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1547    }
1548
1549    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1550        + " retained the pre-restart assignment. " + randomAssignMsg);
1551    return assignments;
1552  }
1553
1554  @Override
1555  public void initialize() throws HBaseIOException{
1556  }
1557
1558  @Override
1559  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1560  }
1561
1562  @Override
1563  public void regionOffline(RegionInfo regionInfo) {
1564  }
1565
1566  @Override
1567  public boolean isStopped() {
1568    return stopped;
1569  }
1570
1571  @Override
1572  public void stop(String why) {
1573    LOG.info("Load Balancer stop requested: "+why);
1574    stopped = true;
1575  }
1576
1577  /**
1578  * Updates the balancer status tag reported to JMX
1579  */
1580  public void updateBalancerStatus(boolean status) {
1581    metricsBalancer.balancerStatus(status);
1582  }
1583
1584  /**
1585   * Used to assign a single region to a random server.
1586   */
1587  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1588      List<ServerName> servers) {
1589    int numServers = servers.size(); // servers is not null, numServers > 1
1590    ServerName sn = null;
1591    final int maxIterations = numServers * 4;
1592    int iterations = 0;
1593    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1594    do {
1595      int i = RANDOM.nextInt(numServers);
1596      sn = servers.get(i);
1597      if (!usedSNs.contains(sn)) {
1598        usedSNs.add(sn);
1599      }
1600    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1601        && iterations++ < maxIterations);
1602    if (iterations >= maxIterations) {
1603      // We have reached the max. Means the servers that we collected is still lowering the
1604      // availability
1605      for (ServerName unusedServer : servers) {
1606        if (!usedSNs.contains(unusedServer)) {
1607          // check if any other unused server is there for us to use.
1608          // If so use it. Else we have not other go but to go with one of them
1609          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1610            sn = unusedServer;
1611            break;
1612          }
1613        }
1614      }
1615    }
1616    cluster.doAssignRegion(regionInfo, sn);
1617    return sn;
1618  }
1619
1620  /**
1621   * Round robin a list of regions to a list of servers
1622   */
1623  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1624    List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) {
1625    List<RegionInfo> unassignedRegions = new ArrayList<>();
1626    int numServers = servers.size();
1627    int numRegions = regions.size();
1628    int max = (int) Math.ceil((float) numRegions / numServers);
1629    int serverIdx = 0;
1630    if (numServers > 1) {
1631      serverIdx = RANDOM.nextInt(numServers);
1632    }
1633    int regionIdx = 0;
1634    for (int j = 0; j < numServers; j++) {
1635      ServerName server = servers.get((j + serverIdx) % numServers);
1636      List<RegionInfo> serverRegions = new ArrayList<>(max);
1637      for (int i = regionIdx; i < numRegions; i += numServers) {
1638        RegionInfo region = regions.get(i % numRegions);
1639        if (cluster.wouldLowerAvailability(region, server)) {
1640          unassignedRegions.add(region);
1641        } else {
1642          serverRegions.add(region);
1643          cluster.doAssignRegion(region, server);
1644        }
1645      }
1646      assignments.put(server, serverRegions);
1647      regionIdx++;
1648    }
1649
1650
1651    List<RegionInfo> lastFewRegions = new ArrayList<>();
1652    // assign the remaining by going through the list and try to assign to servers one-by-one
1653    serverIdx = RANDOM.nextInt(numServers);
1654    OUTER : for (RegionInfo region : unassignedRegions) {
1655      boolean assigned = false;
1656      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1657        ServerName server = servers.get((j + serverIdx) % numServers);
1658        if (cluster.wouldLowerAvailability(region, server)) {
1659          continue INNER;
1660        } else {
1661          assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1662          cluster.doAssignRegion(region, server);
1663          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1664          assigned = true;
1665          break;
1666        }
1667      }
1668      if (!assigned) {
1669        lastFewRegions.add(region);
1670      }
1671    }
1672    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1673    // make it optimal later. we can end up with this if numReplicas > numServers.
1674    for (RegionInfo region : lastFewRegions) {
1675      int i = RANDOM.nextInt(numServers);
1676      ServerName server = servers.get(i);
1677      assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1678      cluster.doAssignRegion(region, server);
1679    }
1680  }
1681
1682  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1683    Collection<RegionInfo> regions) {
1684    if (this.services != null && this.services.getAssignmentManager() != null) {
1685      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1686    } else {
1687      return new HashMap<>();
1688    }
1689  }
1690
1691  private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
1692      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
1693    Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
1694    for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
1695      serverNameListMap.forEach((serverName, regionInfoList) -> {
1696        List<RegionInfo> regionInfos =
1697            returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
1698        regionInfos.addAll(regionInfoList);
1699      });
1700    }
1701    return returnMap;
1702  }
1703
1704  @Override
1705  public abstract List<RegionPlan> balanceTable(TableName tableName,
1706      Map<ServerName, List<RegionInfo>> loadOfOneTable);
1707
1708  @Override
1709  public List<RegionPlan>
1710      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
1711    if (isByTable) {
1712      List<RegionPlan> result = new ArrayList<>();
1713      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
1714        LOG.info("Start Generate Balance plan for table: " + tableName);
1715        List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
1716        if (partialPlans != null) {
1717          result.addAll(partialPlans);
1718        }
1719      });
1720      return result;
1721    } else {
1722      LOG.info("Start Generate Balance plan for cluster.");
1723      return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
1724    }
1725  }
1726
1727  @Override
1728  public void onConfigurationChange(Configuration conf) {
1729  }
1730}