001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.Deque;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableMap;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.function.Predicate;
038import java.util.stream.Collectors;
039
040import org.apache.commons.lang3.NotImplementedException;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseIOException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RackManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * The base class for load balancers. It provides the the functions used to by
069 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
070 * in the edge cases. It doesn't provide an implementation of the
071 * actual balancing algorithm.
072 *
073 */
074@InterfaceAudience.Private
075public abstract class BaseLoadBalancer implements LoadBalancer {
076  protected static final int MIN_SERVER_BALANCE = 2;
077  private volatile boolean stopped = false;
078
079  private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
080
081  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
082    = load -> load.getRegionMetrics().isEmpty();
083
084  protected RegionLocationFinder regionFinder;
085  protected boolean useRegionFinder;
086  protected boolean isByTable = false;
087
088  private static class DefaultRackManager extends RackManager {
089    @Override
090    public String getRack(ServerName server) {
091      return UNKNOWN_RACK;
092    }
093  }
094
095  /**
096   * The constructor that uses the basic MetricsBalancer
097   */
098  protected BaseLoadBalancer() {
099    metricsBalancer = new MetricsBalancer();
100    createRegionFinder();
101  }
102
103  /**
104   * This Constructor accepts an instance of MetricsBalancer,
105   * which will be used instead of creating a new one
106   */
107  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
108    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
109    createRegionFinder();
110  }
111
112  private void createRegionFinder() {
113    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
114    if (useRegionFinder) {
115      regionFinder = new RegionLocationFinder();
116    }
117  }
118
119  /**
120   * An efficient array based implementation similar to ClusterState for keeping
121   * the status of the cluster in terms of region assignment and distribution.
122   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
123   * hundreds of thousands of hashmap manipulations are very costly, which is why this
124   * class uses mostly indexes and arrays.
125   *
126   * Cluster tracks a list of unassigned regions, region assignments, and the server
127   * topology in terms of server names, hostnames and racks.
128   */
129  protected static class Cluster {
130    ServerName[] servers;
131    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
132    String[] racks;
133    boolean multiServersPerHost = false; // whether or not any host has more than one server
134
135    ArrayList<String> tables;
136    RegionInfo[] regions;
137    Deque<BalancerRegionLoad>[] regionLoads;
138    private RegionLocationFinder regionFinder;
139
140    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
141
142    int[]   serverIndexToHostIndex;      //serverIndex -> host index
143    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
144
145    int[][] regionsPerServer;            //serverIndex -> region list
146    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
147    int[][] regionsPerHost;              //hostIndex -> list of regions
148    int[][] regionsPerRack;              //rackIndex -> region list
149    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
150    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
151    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
152
153    int[][] serversPerHost;              //hostIndex -> list of server indexes
154    int[][] serversPerRack;              //rackIndex -> list of server indexes
155    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
156    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
157    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
158    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
159    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
160    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
161    boolean hasRegionReplicas = false;   //whether there is regions with replicas
162
163    Integer[] serverIndicesSortedByRegionCount;
164    Integer[] serverIndicesSortedByLocality;
165
166    Map<String, Integer> serversToIndex;
167    Map<String, Integer> hostsToIndex;
168    Map<String, Integer> racksToIndex;
169    Map<String, Integer> tablesToIndex;
170    Map<RegionInfo, Integer> regionsToIndex;
171    float[] localityPerServer;
172
173    int numServers;
174    int numHosts;
175    int numRacks;
176    int numTables;
177    int numRegions;
178
179    int numMovedRegions = 0; //num moved regions from the initial configuration
180    Map<ServerName, List<RegionInfo>> clusterState;
181
182    protected final RackManager rackManager;
183    // Maps region -> rackIndex -> locality of region on rack
184    private float[][] rackLocalities;
185    // Maps localityType -> region -> [server|rack]Index with highest locality
186    private int[][] regionsToMostLocalEntities;
187
188    protected Cluster(
189        Map<ServerName, List<RegionInfo>> clusterState,
190        Map<String, Deque<BalancerRegionLoad>> loads,
191        RegionLocationFinder regionFinder,
192        RackManager rackManager) {
193      this(null, clusterState, loads, regionFinder, rackManager);
194    }
195
196    @SuppressWarnings("unchecked")
197    protected Cluster(
198        Collection<RegionInfo> unassignedRegions,
199        Map<ServerName, List<RegionInfo>> clusterState,
200        Map<String, Deque<BalancerRegionLoad>> loads,
201        RegionLocationFinder regionFinder,
202        RackManager rackManager) {
203
204      if (unassignedRegions == null) {
205        unassignedRegions = EMPTY_REGION_LIST;
206      }
207
208      serversToIndex = new HashMap<>();
209      hostsToIndex = new HashMap<>();
210      racksToIndex = new HashMap<>();
211      tablesToIndex = new HashMap<>();
212
213      //TODO: We should get the list of tables from master
214      tables = new ArrayList<>();
215      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
216
217      numRegions = 0;
218
219      List<List<Integer>> serversPerHostList = new ArrayList<>();
220      List<List<Integer>> serversPerRackList = new ArrayList<>();
221      this.clusterState = clusterState;
222      this.regionFinder = regionFinder;
223
224      // Use servername and port as there can be dead servers in this list. We want everything with
225      // a matching hostname and port to have the same index.
226      for (ServerName sn : clusterState.keySet()) {
227        if (sn == null) {
228          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
229              "skipping; unassigned regions?");
230          if (LOG.isTraceEnabled()) {
231            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
232          }
233          continue;
234        }
235        if (serversToIndex.get(sn.getAddress().toString()) == null) {
236          serversToIndex.put(sn.getHostAndPort(), numServers++);
237        }
238        if (!hostsToIndex.containsKey(sn.getHostname())) {
239          hostsToIndex.put(sn.getHostname(), numHosts++);
240          serversPerHostList.add(new ArrayList<>(1));
241        }
242
243        int serverIndex = serversToIndex.get(sn.getHostAndPort());
244        int hostIndex = hostsToIndex.get(sn.getHostname());
245        serversPerHostList.get(hostIndex).add(serverIndex);
246
247        String rack = this.rackManager.getRack(sn);
248        if (!racksToIndex.containsKey(rack)) {
249          racksToIndex.put(rack, numRacks++);
250          serversPerRackList.add(new ArrayList<>());
251        }
252        int rackIndex = racksToIndex.get(rack);
253        serversPerRackList.get(rackIndex).add(serverIndex);
254      }
255
256      // Count how many regions there are.
257      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
258        numRegions += entry.getValue().size();
259      }
260      numRegions += unassignedRegions.size();
261
262      regionsToIndex = new HashMap<>(numRegions);
263      servers = new ServerName[numServers];
264      serversPerHost = new int[numHosts][];
265      serversPerRack = new int[numRacks][];
266      regions = new RegionInfo[numRegions];
267      regionIndexToServerIndex = new int[numRegions];
268      initialRegionIndexToServerIndex = new int[numRegions];
269      regionIndexToTableIndex = new int[numRegions];
270      regionIndexToPrimaryIndex = new int[numRegions];
271      regionLoads = new Deque[numRegions];
272
273      regionLocations = new int[numRegions][];
274      serverIndicesSortedByRegionCount = new Integer[numServers];
275      serverIndicesSortedByLocality = new Integer[numServers];
276      localityPerServer = new float[numServers];
277
278      serverIndexToHostIndex = new int[numServers];
279      serverIndexToRackIndex = new int[numServers];
280      regionsPerServer = new int[numServers][];
281      serverIndexToRegionsOffset = new int[numServers];
282      regionsPerHost = new int[numHosts][];
283      regionsPerRack = new int[numRacks][];
284      primariesOfRegionsPerServer = new int[numServers][];
285      primariesOfRegionsPerHost = new int[numHosts][];
286      primariesOfRegionsPerRack = new int[numRacks][];
287
288      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
289
290      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
291        if (entry.getKey() == null) {
292          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
293          continue;
294        }
295        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
296
297        // keep the servername if this is the first server name for this hostname
298        // or this servername has the newest startcode.
299        if (servers[serverIndex] == null ||
300            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
301          servers[serverIndex] = entry.getKey();
302        }
303
304        if (regionsPerServer[serverIndex] != null) {
305          // there is another server with the same hostAndPort in ClusterState.
306          // allocate the array for the total size
307          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
308        } else {
309          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
310        }
311        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
312        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
313        serverIndicesSortedByLocality[serverIndex] = serverIndex;
314      }
315
316      hosts = new String[numHosts];
317      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
318        hosts[entry.getValue()] = entry.getKey();
319      }
320      racks = new String[numRacks];
321      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
322        racks[entry.getValue()] = entry.getKey();
323      }
324
325      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
326        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
327        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
328
329        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
330        serverIndexToHostIndex[serverIndex] = hostIndex;
331
332        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
333        serverIndexToRackIndex[serverIndex] = rackIndex;
334
335        for (RegionInfo region : entry.getValue()) {
336          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
337          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
338          regionIndex++;
339        }
340        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
341      }
342
343      for (RegionInfo region : unassignedRegions) {
344        registerRegion(region, regionIndex, -1, loads, regionFinder);
345        regionIndex++;
346      }
347
348      for (int i = 0; i < serversPerHostList.size(); i++) {
349        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
350        for (int j = 0; j < serversPerHost[i].length; j++) {
351          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
352        }
353        if (serversPerHost[i].length > 1) {
354          multiServersPerHost = true;
355        }
356      }
357
358      for (int i = 0; i < serversPerRackList.size(); i++) {
359        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
360        for (int j = 0; j < serversPerRack[i].length; j++) {
361          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
362        }
363      }
364
365      numTables = tables.size();
366      numRegionsPerServerPerTable = new int[numServers][numTables];
367
368      for (int i = 0; i < numServers; i++) {
369        for (int j = 0; j < numTables; j++) {
370          numRegionsPerServerPerTable[i][j] = 0;
371        }
372      }
373
374      for (int i=0; i < regionIndexToServerIndex.length; i++) {
375        if (regionIndexToServerIndex[i] >= 0) {
376          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
377        }
378      }
379
380      numMaxRegionsPerTable = new int[numTables];
381      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
382        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
383          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
384            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
385          }
386        }
387      }
388
389      for (int i = 0; i < regions.length; i ++) {
390        RegionInfo info = regions[i];
391        if (RegionReplicaUtil.isDefaultReplica(info)) {
392          regionIndexToPrimaryIndex[i] = i;
393        } else {
394          hasRegionReplicas = true;
395          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
396          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
397        }
398      }
399
400      for (int i = 0; i < regionsPerServer.length; i++) {
401        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
402        for (int j = 0; j < regionsPerServer[i].length; j++) {
403          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
404          primariesOfRegionsPerServer[i][j] = primaryIndex;
405        }
406        // sort the regions by primaries.
407        Arrays.sort(primariesOfRegionsPerServer[i]);
408      }
409
410      // compute regionsPerHost
411      if (multiServersPerHost) {
412        for (int i = 0 ; i < serversPerHost.length; i++) {
413          int numRegionsPerHost = 0;
414          for (int j = 0; j < serversPerHost[i].length; j++) {
415            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
416          }
417          regionsPerHost[i] = new int[numRegionsPerHost];
418          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
419        }
420        for (int i = 0 ; i < serversPerHost.length; i++) {
421          int numRegionPerHostIndex = 0;
422          for (int j = 0; j < serversPerHost[i].length; j++) {
423            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
424              int region = regionsPerServer[serversPerHost[i][j]][k];
425              regionsPerHost[i][numRegionPerHostIndex] = region;
426              int primaryIndex = regionIndexToPrimaryIndex[region];
427              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
428              numRegionPerHostIndex++;
429            }
430          }
431          // sort the regions by primaries.
432          Arrays.sort(primariesOfRegionsPerHost[i]);
433        }
434      }
435
436      // compute regionsPerRack
437      if (numRacks > 1) {
438        for (int i = 0 ; i < serversPerRack.length; i++) {
439          int numRegionsPerRack = 0;
440          for (int j = 0; j < serversPerRack[i].length; j++) {
441            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
442          }
443          regionsPerRack[i] = new int[numRegionsPerRack];
444          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
445        }
446
447        for (int i = 0 ; i < serversPerRack.length; i++) {
448          int numRegionPerRackIndex = 0;
449          for (int j = 0; j < serversPerRack[i].length; j++) {
450            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
451              int region = regionsPerServer[serversPerRack[i][j]][k];
452              regionsPerRack[i][numRegionPerRackIndex] = region;
453              int primaryIndex = regionIndexToPrimaryIndex[region];
454              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
455              numRegionPerRackIndex++;
456            }
457          }
458          // sort the regions by primaries.
459          Arrays.sort(primariesOfRegionsPerRack[i]);
460        }
461      }
462    }
463
464    /** Helper for Cluster constructor to handle a region */
465    private void registerRegion(RegionInfo region, int regionIndex,
466        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
467        RegionLocationFinder regionFinder) {
468      String tableName = region.getTable().getNameAsString();
469      if (!tablesToIndex.containsKey(tableName)) {
470        tables.add(tableName);
471        tablesToIndex.put(tableName, tablesToIndex.size());
472      }
473      int tableIndex = tablesToIndex.get(tableName);
474
475      regionsToIndex.put(region, regionIndex);
476      regions[regionIndex] = region;
477      regionIndexToServerIndex[regionIndex] = serverIndex;
478      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
479      regionIndexToTableIndex[regionIndex] = tableIndex;
480
481      // region load
482      if (loads != null) {
483        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
484        // That could have failed if the RegionLoad is using the other regionName
485        if (rl == null) {
486          // Try getting the region load using encoded name.
487          rl = loads.get(region.getEncodedName());
488        }
489        regionLoads[regionIndex] = rl;
490      }
491
492      if (regionFinder != null) {
493        // region location
494        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
495        regionLocations[regionIndex] = new int[loc.size()];
496        for (int i = 0; i < loc.size(); i++) {
497          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
498              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
499                  : serversToIndex.get(loc.get(i).getHostAndPort()));
500        }
501      }
502    }
503
504    /**
505     * Returns true iff a given server has less regions than the balanced amount
506     */
507    public boolean serverHasTooFewRegions(int server) {
508      int minLoad = this.numRegions / numServers;
509      int numRegions = getNumRegions(server);
510      return numRegions < minLoad;
511    }
512
513    /**
514     * Retrieves and lazily initializes a field storing the locality of
515     * every region/server combination
516     */
517    public float[][] getOrComputeRackLocalities() {
518      if (rackLocalities == null || regionsToMostLocalEntities == null) {
519        computeCachedLocalities();
520      }
521      return rackLocalities;
522    }
523
524    /**
525     * Lazily initializes and retrieves a mapping of region -> server for which region has
526     * the highest the locality
527     */
528    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
529      if (rackLocalities == null || regionsToMostLocalEntities == null) {
530        computeCachedLocalities();
531      }
532      return regionsToMostLocalEntities[type.ordinal()];
533    }
534
535    /**
536     * Looks up locality from cache of localities. Will create cache if it does
537     * not already exist.
538     */
539    public float getOrComputeLocality(int region, int entity, LocalityType type) {
540      switch (type) {
541        case SERVER:
542          return getLocalityOfRegion(region, entity);
543        case RACK:
544          return getOrComputeRackLocalities()[region][entity];
545        default:
546          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
547      }
548    }
549
550    /**
551     * Returns locality weighted by region size in MB. Will create locality cache
552     * if it does not already exist.
553     */
554    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
555      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
556    }
557
558    /**
559     * Returns the size in MB from the most recent RegionLoad for region
560     */
561    public int getRegionSizeMB(int region) {
562      Deque<BalancerRegionLoad> load = regionLoads[region];
563      // This means regions have no actual data on disk
564      if (load == null) {
565        return 0;
566      }
567      return regionLoads[region].getLast().getStorefileSizeMB();
568    }
569
570    /**
571     * Computes and caches the locality for each region/rack combinations,
572     * as well as storing a mapping of region -> server and region -> rack such that server
573     * and rack have the highest locality for region
574     */
575    private void computeCachedLocalities() {
576      rackLocalities = new float[numRegions][numRacks];
577      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
578
579      // Compute localities and find most local server per region
580      for (int region = 0; region < numRegions; region++) {
581        int serverWithBestLocality = 0;
582        float bestLocalityForRegion = 0;
583        for (int server = 0; server < numServers; server++) {
584          // Aggregate per-rack locality
585          float locality = getLocalityOfRegion(region, server);
586          int rack = serverIndexToRackIndex[server];
587          int numServersInRack = serversPerRack[rack].length;
588          rackLocalities[region][rack] += locality / numServersInRack;
589
590          if (locality > bestLocalityForRegion) {
591            serverWithBestLocality = server;
592            bestLocalityForRegion = locality;
593          }
594        }
595        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
596
597        // Find most local rack per region
598        int rackWithBestLocality = 0;
599        float bestRackLocalityForRegion = 0.0f;
600        for (int rack = 0; rack < numRacks; rack++) {
601          float rackLocality = rackLocalities[region][rack];
602          if (rackLocality > bestRackLocalityForRegion) {
603            bestRackLocalityForRegion = rackLocality;
604            rackWithBestLocality = rack;
605          }
606        }
607        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
608      }
609
610    }
611
612    /**
613     * Maps region index to rack index
614     */
615    public int getRackForRegion(int region) {
616      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
617    }
618
619    enum LocalityType {
620      SERVER,
621      RACK
622    }
623
624    /** An action to move or swap a region */
625    public static class Action {
626      public enum Type {
627        ASSIGN_REGION,
628        MOVE_REGION,
629        SWAP_REGIONS,
630        NULL,
631      }
632
633      public Type type;
634      public Action (Type type) {this.type = type;}
635      /** Returns an Action which would undo this action */
636      public Action undoAction() { return this; }
637      @Override
638      public String toString() { return type + ":";}
639    }
640
641    public static class AssignRegionAction extends Action {
642      public int region;
643      public int server;
644      public AssignRegionAction(int region, int server) {
645        super(Type.ASSIGN_REGION);
646        this.region = region;
647        this.server = server;
648      }
649      @Override
650      public Action undoAction() {
651        // TODO implement this. This action is not being used by the StochasticLB for now
652        // in case it uses it, we should implement this function.
653        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
654      }
655      @Override
656      public String toString() {
657        return type + ": " + region + ":" + server;
658      }
659    }
660
661    public static class MoveRegionAction extends Action {
662      public int region;
663      public int fromServer;
664      public int toServer;
665
666      public MoveRegionAction(int region, int fromServer, int toServer) {
667        super(Type.MOVE_REGION);
668        this.fromServer = fromServer;
669        this.region = region;
670        this.toServer = toServer;
671      }
672      @Override
673      public Action undoAction() {
674        return new MoveRegionAction (region, toServer, fromServer);
675      }
676      @Override
677      public String toString() {
678        return type + ": " + region + ":" + fromServer + " -> " + toServer;
679      }
680    }
681
682    public static class SwapRegionsAction extends Action {
683      public int fromServer;
684      public int fromRegion;
685      public int toServer;
686      public int toRegion;
687      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
688        super(Type.SWAP_REGIONS);
689        this.fromServer = fromServer;
690        this.fromRegion = fromRegion;
691        this.toServer = toServer;
692        this.toRegion = toRegion;
693      }
694      @Override
695      public Action undoAction() {
696        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
697      }
698      @Override
699      public String toString() {
700        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
701      }
702    }
703
704    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
705        justification="Mistake. Too disruptive to change now")
706    public static final Action NullAction = new Action(Type.NULL);
707
708    public void doAction(Action action) {
709      switch (action.type) {
710      case NULL: break;
711      case ASSIGN_REGION:
712        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
713        assert action instanceof AssignRegionAction: action.getClass();
714        AssignRegionAction ar = (AssignRegionAction) action;
715        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
716        regionMoved(ar.region, -1, ar.server);
717        break;
718      case MOVE_REGION:
719        assert action instanceof MoveRegionAction: action.getClass();
720        MoveRegionAction mra = (MoveRegionAction) action;
721        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
722        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
723        regionMoved(mra.region, mra.fromServer, mra.toServer);
724        break;
725      case SWAP_REGIONS:
726        assert action instanceof SwapRegionsAction: action.getClass();
727        SwapRegionsAction a = (SwapRegionsAction) action;
728        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
729        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
730        regionMoved(a.fromRegion, a.fromServer, a.toServer);
731        regionMoved(a.toRegion, a.toServer, a.fromServer);
732        break;
733      default:
734        throw new RuntimeException("Uknown action:" + action.type);
735      }
736    }
737
738    /**
739     * Return true if the placement of region on server would lower the availability
740     * of the region in question
741     * @return true or false
742     */
743    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
744      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
745        return false; // safeguard against race between cluster.servers and servers from LB method args
746      }
747      int server = serversToIndex.get(serverName.getHostAndPort());
748      int region = regionsToIndex.get(regionInfo);
749
750      // Region replicas for same region should better assign to different servers
751      for (int i : regionsPerServer[server]) {
752        RegionInfo otherRegionInfo = regions[i];
753        if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
754          return true;
755        }
756      }
757
758      int primary = regionIndexToPrimaryIndex[region];
759      if (primary == -1) {
760        return false;
761      }
762      // there is a subset relation for server < host < rack
763      // check server first
764      if (contains(primariesOfRegionsPerServer[server], primary)) {
765        // check for whether there are other servers that we can place this region
766        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
767          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
768            return true; // meaning there is a better server
769          }
770        }
771        return false; // there is not a better server to place this
772      }
773
774      // check host
775      if (multiServersPerHost) {
776        // these arrays would only be allocated if we have more than one server per host
777        int host = serverIndexToHostIndex[server];
778        if (contains(primariesOfRegionsPerHost[host], primary)) {
779          // check for whether there are other hosts that we can place this region
780          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
781            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
782              return true; // meaning there is a better host
783            }
784          }
785          return false; // there is not a better host to place this
786        }
787      }
788
789      // check rack
790      if (numRacks > 1) {
791        int rack = serverIndexToRackIndex[server];
792        if (contains(primariesOfRegionsPerRack[rack], primary)) {
793          // check for whether there are other racks that we can place this region
794          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
795            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
796              return true; // meaning there is a better rack
797            }
798          }
799          return false; // there is not a better rack to place this
800        }
801      }
802
803      return false;
804    }
805
806    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
807      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
808        return;
809      }
810      int server = serversToIndex.get(serverName.getHostAndPort());
811      int region = regionsToIndex.get(regionInfo);
812      doAction(new AssignRegionAction(region, server));
813    }
814
815    void regionMoved(int region, int oldServer, int newServer) {
816      regionIndexToServerIndex[region] = newServer;
817      if (initialRegionIndexToServerIndex[region] == newServer) {
818        numMovedRegions--; //region moved back to original location
819      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
820        numMovedRegions++; //region moved from original location
821      }
822      int tableIndex = regionIndexToTableIndex[region];
823      if (oldServer >= 0) {
824        numRegionsPerServerPerTable[oldServer][tableIndex]--;
825      }
826      numRegionsPerServerPerTable[newServer][tableIndex]++;
827
828      //check whether this caused maxRegionsPerTable in the new Server to be updated
829      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
830        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
831      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
832          == numMaxRegionsPerTable[tableIndex]) {
833        //recompute maxRegionsPerTable since the previous value was coming from the old server
834        numMaxRegionsPerTable[tableIndex] = 0;
835        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
836          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
837            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
838          }
839        }
840      }
841
842      // update for servers
843      int primary = regionIndexToPrimaryIndex[region];
844      if (oldServer >= 0) {
845        primariesOfRegionsPerServer[oldServer] = removeRegion(
846          primariesOfRegionsPerServer[oldServer], primary);
847      }
848      primariesOfRegionsPerServer[newServer] = addRegionSorted(
849        primariesOfRegionsPerServer[newServer], primary);
850
851      // update for hosts
852      if (multiServersPerHost) {
853        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
854        int newHost = serverIndexToHostIndex[newServer];
855        if (newHost != oldHost) {
856          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
857          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
858          if (oldHost >= 0) {
859            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
860            primariesOfRegionsPerHost[oldHost] = removeRegion(
861              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
862          }
863        }
864      }
865
866      // update for racks
867      if (numRacks > 1) {
868        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
869        int newRack = serverIndexToRackIndex[newServer];
870        if (newRack != oldRack) {
871          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
872          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
873          if (oldRack >= 0) {
874            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
875            primariesOfRegionsPerRack[oldRack] = removeRegion(
876              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
877          }
878        }
879      }
880    }
881
882    int[] removeRegion(int[] regions, int regionIndex) {
883      //TODO: this maybe costly. Consider using linked lists
884      int[] newRegions = new int[regions.length - 1];
885      int i = 0;
886      for (i = 0; i < regions.length; i++) {
887        if (regions[i] == regionIndex) {
888          break;
889        }
890        newRegions[i] = regions[i];
891      }
892      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
893      return newRegions;
894    }
895
896    int[] addRegion(int[] regions, int regionIndex) {
897      int[] newRegions = new int[regions.length + 1];
898      System.arraycopy(regions, 0, newRegions, 0, regions.length);
899      newRegions[newRegions.length - 1] = regionIndex;
900      return newRegions;
901    }
902
903    int[] addRegionSorted(int[] regions, int regionIndex) {
904      int[] newRegions = new int[regions.length + 1];
905      int i = 0;
906      for (i = 0; i < regions.length; i++) { // find the index to insert
907        if (regions[i] > regionIndex) {
908          break;
909        }
910      }
911      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
912      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
913      newRegions[i] = regionIndex;
914
915      return newRegions;
916    }
917
918    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
919      int i = 0;
920      for (i = 0; i < regions.length; i++) {
921        if (regions[i] == regionIndex) {
922          regions[i] = newRegionIndex;
923          break;
924        }
925      }
926      return regions;
927    }
928
929    void sortServersByRegionCount() {
930      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
931    }
932
933    int getNumRegions(int server) {
934      return regionsPerServer[server].length;
935    }
936
937    boolean contains(int[] arr, int val) {
938      return Arrays.binarySearch(arr, val) >= 0;
939    }
940
941    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
942
943    int getLowestLocalityRegionOnServer(int serverIndex) {
944      if (regionFinder != null) {
945        float lowestLocality = 1.0f;
946        int lowestLocalityRegionIndex = -1;
947        if (regionsPerServer[serverIndex].length == 0) {
948          // No regions on that region server
949          return -1;
950        }
951        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
952          int regionIndex = regionsPerServer[serverIndex][j];
953          HDFSBlocksDistribution distribution = regionFinder
954              .getBlockDistribution(regions[regionIndex]);
955          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
956          // skip empty region
957          if (distribution.getUniqueBlocksTotalWeight() == 0) {
958            continue;
959          }
960          if (locality < lowestLocality) {
961            lowestLocality = locality;
962            lowestLocalityRegionIndex = j;
963          }
964        }
965        if (lowestLocalityRegionIndex == -1) {
966          return -1;
967        }
968        if (LOG.isTraceEnabled()) {
969          LOG.trace("Lowest locality region is "
970              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
971                  .getRegionNameAsString() + " with locality " + lowestLocality
972              + " and its region server contains " + regionsPerServer[serverIndex].length
973              + " regions");
974        }
975        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
976      } else {
977        return -1;
978      }
979    }
980
981    float getLocalityOfRegion(int region, int server) {
982      if (regionFinder != null) {
983        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
984        return distribution.getBlockLocalityIndex(servers[server].getHostname());
985      } else {
986        return 0f;
987      }
988    }
989
990    @VisibleForTesting
991    protected void setNumRegions(int numRegions) {
992      this.numRegions = numRegions;
993    }
994
995    @VisibleForTesting
996    protected void setNumMovedRegions(int numMovedRegions) {
997      this.numMovedRegions = numMovedRegions;
998    }
999
1000    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
1001        justification="Not important but should be fixed")
1002    @Override
1003    public String toString() {
1004      StringBuilder desc = new StringBuilder("Cluster={servers=[");
1005      for(ServerName sn:servers) {
1006        desc.append(sn.getHostAndPort()).append(", ");
1007      }
1008      desc.append("], serverIndicesSortedByRegionCount=")
1009          .append(Arrays.toString(serverIndicesSortedByRegionCount))
1010          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
1011
1012      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
1013          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1014          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1015          .append(numMovedRegions).append('}');
1016      return desc.toString();
1017    }
1018  }
1019
1020  // slop for regions
1021  protected float slop;
1022  // overallSlop to control simpleLoadBalancer's cluster level threshold
1023  protected float overallSlop;
1024  protected Configuration config = HBaseConfiguration.create();
1025  protected RackManager rackManager;
1026  private static final Random RANDOM = new Random(System.currentTimeMillis());
1027  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1028  protected MetricsBalancer metricsBalancer = null;
1029  protected ClusterMetrics clusterStatus = null;
1030  protected ServerName masterServerName;
1031  protected MasterServices services;
1032  protected boolean onlySystemTablesOnMaster;
1033  protected boolean maintenanceMode;
1034
1035  @Override
1036  public void setConf(Configuration conf) {
1037    this.config = conf;
1038    setSlop(conf);
1039    if (slop < 0) slop = 0;
1040    else if (slop > 1) slop = 1;
1041
1042    if (overallSlop < 0) overallSlop = 0;
1043    else if (overallSlop > 1) overallSlop = 1;
1044
1045    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1046
1047    this.rackManager = new RackManager(getConf());
1048    if (useRegionFinder) {
1049      regionFinder.setConf(conf);
1050    }
1051    this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
1052    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1053    LOG.info("slop={}, systemTablesOnMaster={}",
1054        this.slop, this.onlySystemTablesOnMaster);
1055  }
1056
1057  protected void setSlop(Configuration conf) {
1058    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1059    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1060  }
1061
1062  /**
1063   * Check if a region belongs to some system table.
1064   * If so, the primary replica may be expected to be put on the master regionserver.
1065   */
1066  public boolean shouldBeOnMaster(RegionInfo region) {
1067    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1068        && region.getTable().isSystemTable();
1069  }
1070
1071  /**
1072   * Balance the regions that should be on master regionserver.
1073   */
1074  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1075    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1076    List<RegionPlan> plans = null;
1077    List<RegionInfo> regions = clusterMap.get(masterServerName);
1078    if (regions != null) {
1079      Iterator<ServerName> keyIt = null;
1080      for (RegionInfo region: regions) {
1081        if (shouldBeOnMaster(region)) continue;
1082
1083        // Find a non-master regionserver to host the region
1084        if (keyIt == null || !keyIt.hasNext()) {
1085          keyIt = clusterMap.keySet().iterator();
1086        }
1087        ServerName dest = keyIt.next();
1088        if (masterServerName.equals(dest)) {
1089          if (!keyIt.hasNext()) {
1090            keyIt = clusterMap.keySet().iterator();
1091          }
1092          dest = keyIt.next();
1093        }
1094
1095        // Move this region away from the master regionserver
1096        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1097        if (plans == null) {
1098          plans = new ArrayList<>();
1099        }
1100        plans.add(plan);
1101      }
1102    }
1103    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1104      if (masterServerName.equals(server.getKey())) continue;
1105      for (RegionInfo region: server.getValue()) {
1106        if (!shouldBeOnMaster(region)) continue;
1107
1108        // Move this region to the master regionserver
1109        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1110        if (plans == null) {
1111          plans = new ArrayList<>();
1112        }
1113        plans.add(plan);
1114      }
1115    }
1116    return plans;
1117  }
1118
1119  /**
1120   * If master is configured to carry system tables only, in here is
1121   * where we figure what to assign it.
1122   */
1123  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1124      Collection<RegionInfo> regions, List<ServerName> servers) {
1125    if (servers == null || regions == null || regions.isEmpty()) {
1126      return null;
1127    }
1128    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1129    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1130      if (masterServerName != null && servers.contains(masterServerName)) {
1131        assignments.put(masterServerName, new ArrayList<>());
1132        for (RegionInfo region : regions) {
1133          if (shouldBeOnMaster(region)) {
1134            assignments.get(masterServerName).add(region);
1135          }
1136        }
1137      }
1138    }
1139    return assignments;
1140  }
1141
1142  @Override
1143  public Configuration getConf() {
1144    return this.config;
1145  }
1146
1147  @Override
1148  public synchronized void setClusterMetrics(ClusterMetrics st) {
1149    this.clusterStatus = st;
1150    if (useRegionFinder) {
1151      regionFinder.setClusterMetrics(st);
1152    }
1153  }
1154
1155
1156  @Override
1157  public void setMasterServices(MasterServices masterServices) {
1158    masterServerName = masterServices.getServerName();
1159    this.services = masterServices;
1160    if (useRegionFinder) {
1161      this.regionFinder.setServices(masterServices);
1162    }
1163    if (this.services.isInMaintenanceMode()) {
1164      this.maintenanceMode = true;
1165    }
1166  }
1167
1168  @Override
1169  public void postMasterStartupInitialize() {
1170    if (services != null && regionFinder != null) {
1171      try {
1172        Set<RegionInfo> regions =
1173            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1174        regionFinder.refreshAndWait(regions);
1175      } catch (Exception e) {
1176        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1177      }
1178    }
1179  }
1180
1181  public void setRackManager(RackManager rackManager) {
1182    this.rackManager = rackManager;
1183  }
1184
1185  protected boolean needsBalance(TableName tableName, Cluster c) {
1186    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1187    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1188      if (LOG.isDebugEnabled()) {
1189        LOG.debug("Not running balancer because only " + cs.getNumServers()
1190            + " active regionserver(s)");
1191      }
1192      return false;
1193    }
1194    if(areSomeRegionReplicasColocated(c)) return true;
1195    // Check if we even need to do any load balancing
1196    // HBASE-3681 check sloppiness first
1197    float average = cs.getLoadAverage(); // for logging
1198    int floor = (int) Math.floor(average * (1 - slop));
1199    int ceiling = (int) Math.ceil(average * (1 + slop));
1200    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1201      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1202      if (LOG.isTraceEnabled()) {
1203        // If nothing to balance, then don't say anything unless trace-level logging.
1204        LOG.trace("Skipping load balancing because balanced cluster; " +
1205          "servers=" + cs.getNumServers() +
1206          " regions=" + cs.getNumRegions() + " average=" + average +
1207          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1208          " leastloaded=" + serversByLoad.firstKey().getLoad());
1209      }
1210      return false;
1211    }
1212    return true;
1213  }
1214
1215  /**
1216   * Subclasses should implement this to return true if the cluster has nodes that hosts
1217   * multiple replicas for the same region, or, if there are multiple racks and the same
1218   * rack hosts replicas of the same region
1219   * @param c Cluster information
1220   * @return whether region replicas are currently co-located
1221   */
1222  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1223    return false;
1224  }
1225
1226  /**
1227   * Generates a bulk assignment plan to be used on cluster startup using a
1228   * simple round-robin assignment.
1229   * <p>
1230   * Takes a list of all the regions and all the servers in the cluster and
1231   * returns a map of each server to the regions that it should be assigned.
1232   * <p>
1233   * Currently implemented as a round-robin assignment. Same invariant as load
1234   * balancing, all servers holding floor(avg) or ceiling(avg).
1235   *
1236   * TODO: Use block locations from HDFS to place regions with their blocks
1237   *
1238   * @param regions all regions
1239   * @param servers all servers
1240   * @return map of server to the regions it should take, or null if no
1241   *         assignment is possible (ie. no regions or no servers)
1242   */
1243  @Override
1244  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1245      List<ServerName> servers) throws HBaseIOException {
1246    metricsBalancer.incrMiscInvocations();
1247    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1248    if (assignments != null && !assignments.isEmpty()) {
1249      servers = new ArrayList<>(servers);
1250      // Guarantee not to put other regions on master
1251      servers.remove(masterServerName);
1252      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1253      if (!masterRegions.isEmpty()) {
1254        regions = new ArrayList<>(regions);
1255        regions.removeAll(masterRegions);
1256      }
1257    }
1258    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1259      return assignments;
1260    }
1261
1262    int numServers = servers == null ? 0 : servers.size();
1263    if (numServers == 0) {
1264      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1265      return null;
1266    }
1267
1268    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1269    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1270    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1271    // and balanced. This should also run fast with fewer number of iterations.
1272
1273    if (numServers == 1) { // Only one server, nothing fancy we can do here
1274      ServerName server = servers.get(0);
1275      assignments.put(server, new ArrayList<>(regions));
1276      return assignments;
1277    }
1278
1279    Cluster cluster = createCluster(servers, regions);
1280    roundRobinAssignment(cluster, regions, servers, assignments);
1281    return assignments;
1282  }
1283
1284  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
1285      throws HBaseIOException {
1286    boolean hasRegionReplica = false;
1287    try {
1288      if (services != null && services.getTableDescriptors() != null) {
1289        Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
1290        for (RegionInfo regionInfo : regions) {
1291          TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
1292          if (td != null && td.getRegionReplication() > 1) {
1293            hasRegionReplica = true;
1294            break;
1295          }
1296        }
1297      }
1298    } catch (IOException ioe) {
1299      throw new HBaseIOException(ioe);
1300    }
1301
1302    // Get the snapshot of the current assignments for the regions in question, and then create
1303    // a cluster out of it. Note that we might have replicas already assigned to some servers
1304    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1305    // replicas of the regions that are passed (for performance).
1306    Map<ServerName, List<RegionInfo>> clusterState = null;
1307    if (!hasRegionReplica) {
1308      clusterState = getRegionAssignmentsByServer(regions);
1309    } else {
1310      // for the case where we have region replica it is better we get the entire cluster's snapshot
1311      clusterState = getRegionAssignmentsByServer(null);
1312    }
1313
1314    for (ServerName server : servers) {
1315      if (!clusterState.containsKey(server)) {
1316        clusterState.put(server, EMPTY_REGION_LIST);
1317      }
1318    }
1319    return new Cluster(regions, clusterState, null, this.regionFinder,
1320        rackManager);
1321  }
1322
1323  private List<ServerName> findIdleServers(List<ServerName> servers) {
1324    return this.services.getServerManager()
1325            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1326  }
1327
1328  /**
1329   * Used to assign a single region to a random server.
1330   */
1331  @Override
1332  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1333      throws HBaseIOException {
1334    metricsBalancer.incrMiscInvocations();
1335    if (servers != null && servers.contains(masterServerName)) {
1336      if (shouldBeOnMaster(regionInfo)) {
1337        return masterServerName;
1338      }
1339      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1340        // Guarantee we do not put any regions on master
1341        servers = new ArrayList<>(servers);
1342        servers.remove(masterServerName);
1343      }
1344    }
1345
1346    int numServers = servers == null ? 0 : servers.size();
1347    if (numServers == 0) {
1348      LOG.warn("Wanted to retain assignment but no servers to assign to");
1349      return null;
1350    }
1351    if (numServers == 1) { // Only one server, nothing fancy we can do here
1352      return servers.get(0);
1353    }
1354    List<ServerName> idleServers = findIdleServers(servers);
1355    if (idleServers.size() == 1) {
1356      return idleServers.get(0);
1357    }
1358    final List<ServerName> finalServers = idleServers.isEmpty() ?
1359            servers : idleServers;
1360    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1361    Cluster cluster = createCluster(finalServers, regions);
1362    return randomAssignment(cluster, regionInfo, finalServers);
1363  }
1364
1365  /**
1366   * Generates a bulk assignment startup plan, attempting to reuse the existing
1367   * assignment information from META, but adjusting for the specified list of
1368   * available/online servers available for assignment.
1369   * <p>
1370   * Takes a map of all regions to their existing assignment from META. Also
1371   * takes a list of online servers for regions to be assigned to. Attempts to
1372   * retain all assignment, so in some instances initial assignment will not be
1373   * completely balanced.
1374   * <p>
1375   * Any leftover regions without an existing server to be assigned to will be
1376   * assigned randomly to available servers.
1377   *
1378   * @param regions regions and existing assignment from meta
1379   * @param servers available servers
1380   * @return map of servers and regions to be assigned to them
1381   */
1382  @Override
1383  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1384      List<ServerName> servers) throws HBaseIOException {
1385    // Update metrics
1386    metricsBalancer.incrMiscInvocations();
1387    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1388    if (assignments != null && !assignments.isEmpty()) {
1389      servers = new ArrayList<>(servers);
1390      // Guarantee not to put other regions on master
1391      servers.remove(masterServerName);
1392      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1393      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1394          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1395    }
1396    if (this.maintenanceMode || regions.isEmpty()) {
1397      return assignments;
1398    }
1399
1400    int numServers = servers == null ? 0 : servers.size();
1401    if (numServers == 0) {
1402      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1403      return null;
1404    }
1405    if (numServers == 1) { // Only one server, nothing fancy we can do here
1406      ServerName server = servers.get(0);
1407      assignments.put(server, new ArrayList<>(regions.keySet()));
1408      return assignments;
1409    }
1410
1411    // Group all of the old assignments by their hostname.
1412    // We can't group directly by ServerName since the servers all have
1413    // new start-codes.
1414
1415    // Group the servers by their hostname. It's possible we have multiple
1416    // servers on the same host on different ports.
1417    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1418    for (ServerName server : servers) {
1419      assignments.put(server, new ArrayList<>());
1420      serversByHostname.put(server.getHostnameLowerCase(), server);
1421    }
1422
1423    // Collection of the hostnames that used to have regions
1424    // assigned, but for which we no longer have any RS running
1425    // after the cluster restart.
1426    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1427
1428    // If the old servers aren't present, lets assign those regions later.
1429    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1430
1431    int numRandomAssignments = 0;
1432    int numRetainedAssigments = 0;
1433    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1434      RegionInfo region = entry.getKey();
1435      ServerName oldServerName = entry.getValue();
1436      List<ServerName> localServers = new ArrayList<>();
1437      if (oldServerName != null) {
1438        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1439      }
1440      if (localServers.isEmpty()) {
1441        // No servers on the new cluster match up with this hostname, assign randomly, later.
1442        randomAssignRegions.add(region);
1443        if (oldServerName != null) {
1444          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1445        }
1446      } else if (localServers.size() == 1) {
1447        // the usual case - one new server on same host
1448        ServerName target = localServers.get(0);
1449        assignments.get(target).add(region);
1450        numRetainedAssigments++;
1451      } else {
1452        // multiple new servers in the cluster on this same host
1453        if (localServers.contains(oldServerName)) {
1454          assignments.get(oldServerName).add(region);
1455          numRetainedAssigments++;
1456        } else {
1457          ServerName target = null;
1458          for (ServerName tmp : localServers) {
1459            if (tmp.getPort() == oldServerName.getPort()) {
1460              target = tmp;
1461              assignments.get(tmp).add(region);
1462              numRetainedAssigments++;
1463              break;
1464            }
1465          }
1466          if (target == null) {
1467            randomAssignRegions.add(region);
1468          }
1469        }
1470      }
1471    }
1472
1473    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1474    if (randomAssignRegions.size() > 0) {
1475      Cluster cluster = createCluster(servers, regions.keySet());
1476      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1477        ServerName sn = entry.getKey();
1478        for (RegionInfo region : entry.getValue()) {
1479          cluster.doAssignRegion(region, sn);
1480        }
1481      }
1482      for (RegionInfo region : randomAssignRegions) {
1483        ServerName target = randomAssignment(cluster, region, servers);
1484        assignments.get(target).add(region);
1485        numRandomAssignments++;
1486      }
1487    }
1488
1489    String randomAssignMsg = "";
1490    if (numRandomAssignments > 0) {
1491      randomAssignMsg =
1492          numRandomAssignments + " regions were assigned "
1493              + "to random hosts, since the old hosts for these regions are no "
1494              + "longer present in the cluster. These hosts were:\n  "
1495              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1496    }
1497
1498    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1499        + " retained the pre-restart assignment. " + randomAssignMsg);
1500    return assignments;
1501  }
1502
1503  @Override
1504  public void initialize() throws HBaseIOException{
1505  }
1506
1507  @Override
1508  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1509  }
1510
1511  @Override
1512  public void regionOffline(RegionInfo regionInfo) {
1513  }
1514
1515  @Override
1516  public boolean isStopped() {
1517    return stopped;
1518  }
1519
1520  @Override
1521  public void stop(String why) {
1522    LOG.info("Load Balancer stop requested: "+why);
1523    stopped = true;
1524  }
1525
1526  /**
1527  * Updates the balancer status tag reported to JMX
1528  */
1529  public void updateBalancerStatus(boolean status) {
1530    metricsBalancer.balancerStatus(status);
1531  }
1532
1533  /**
1534   * Used to assign a single region to a random server.
1535   */
1536  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1537      List<ServerName> servers) {
1538    int numServers = servers.size(); // servers is not null, numServers > 1
1539    ServerName sn = null;
1540    final int maxIterations = numServers * 4;
1541    int iterations = 0;
1542    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1543    do {
1544      int i = RANDOM.nextInt(numServers);
1545      sn = servers.get(i);
1546      if (!usedSNs.contains(sn)) {
1547        usedSNs.add(sn);
1548      }
1549    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1550        && iterations++ < maxIterations);
1551    if (iterations >= maxIterations) {
1552      // We have reached the max. Means the servers that we collected is still lowering the
1553      // availability
1554      for (ServerName unusedServer : servers) {
1555        if (!usedSNs.contains(unusedServer)) {
1556          // check if any other unused server is there for us to use.
1557          // If so use it. Else we have not other go but to go with one of them
1558          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1559            sn = unusedServer;
1560            break;
1561          }
1562        }
1563      }
1564    }
1565    cluster.doAssignRegion(regionInfo, sn);
1566    return sn;
1567  }
1568
1569  /**
1570   * Round robin a list of regions to a list of servers
1571   */
1572  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1573    List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) {
1574    List<RegionInfo> unassignedRegions = new ArrayList<>();
1575    int numServers = servers.size();
1576    int numRegions = regions.size();
1577    int max = (int) Math.ceil((float) numRegions / numServers);
1578    int serverIdx = 0;
1579    if (numServers > 1) {
1580      serverIdx = RANDOM.nextInt(numServers);
1581    }
1582    int regionIdx = 0;
1583    for (int j = 0; j < numServers; j++) {
1584      ServerName server = servers.get((j + serverIdx) % numServers);
1585      List<RegionInfo> serverRegions = new ArrayList<>(max);
1586      for (int i = regionIdx; i < numRegions; i += numServers) {
1587        RegionInfo region = regions.get(i % numRegions);
1588        if (cluster.wouldLowerAvailability(region, server)) {
1589          unassignedRegions.add(region);
1590        } else {
1591          serverRegions.add(region);
1592          cluster.doAssignRegion(region, server);
1593        }
1594      }
1595      assignments.put(server, serverRegions);
1596      regionIdx++;
1597    }
1598
1599
1600    List<RegionInfo> lastFewRegions = new ArrayList<>();
1601    // assign the remaining by going through the list and try to assign to servers one-by-one
1602    serverIdx = RANDOM.nextInt(numServers);
1603    OUTER : for (RegionInfo region : unassignedRegions) {
1604      boolean assigned = false;
1605      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1606        ServerName server = servers.get((j + serverIdx) % numServers);
1607        if (cluster.wouldLowerAvailability(region, server)) {
1608          continue INNER;
1609        } else {
1610          assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1611          cluster.doAssignRegion(region, server);
1612          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1613          assigned = true;
1614          break;
1615        }
1616      }
1617      if (!assigned) {
1618        lastFewRegions.add(region);
1619      }
1620    }
1621    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1622    // make it optimal later. we can end up with this if numReplicas > numServers.
1623    for (RegionInfo region : lastFewRegions) {
1624      int i = RANDOM.nextInt(numServers);
1625      ServerName server = servers.get(i);
1626      assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1627      cluster.doAssignRegion(region, server);
1628    }
1629  }
1630
1631  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1632    Collection<RegionInfo> regions) {
1633    if (this.services != null && this.services.getAssignmentManager() != null) {
1634      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1635    } else {
1636      return new HashMap<>();
1637    }
1638  }
1639
1640  private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
1641      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
1642    Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
1643    for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
1644      serverNameListMap.forEach((serverName, regionInfoList) -> {
1645        List<RegionInfo> regionInfos =
1646            returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
1647        regionInfos.addAll(regionInfoList);
1648      });
1649    }
1650    return returnMap;
1651  }
1652
1653  @Override
1654  public abstract List<RegionPlan> balanceTable(TableName tableName,
1655      Map<ServerName, List<RegionInfo>> loadOfOneTable);
1656
1657  @Override
1658  public List<RegionPlan>
1659      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
1660    if (isByTable) {
1661      List<RegionPlan> result = new ArrayList<>();
1662      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
1663        LOG.info("Start Generate Balance plan for table: " + tableName);
1664        List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
1665        if (partialPlans != null) {
1666          result.addAll(partialPlans);
1667        }
1668      });
1669      return result;
1670    } else {
1671      LOG.info("Start Generate Balance plan for cluster.");
1672      return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
1673    }
1674  }
1675
1676  @Override
1677  public void onConfigurationChange(Configuration conf) {
1678  }
1679}