001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.NavigableMap;
032import java.util.Random;
033import java.util.Set;
034import java.util.TreeMap;
035import java.util.function.Predicate;
036import java.util.stream.Collectors;
037
038import org.apache.commons.lang3.NotImplementedException;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ClusterMetrics;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseIOException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HDFSBlocksDistribution;
045import org.apache.hadoop.hbase.ServerMetrics;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.master.LoadBalancer;
051import org.apache.hadoop.hbase.master.MasterServices;
052import org.apache.hadoop.hbase.master.RackManager;
053import org.apache.hadoop.hbase.master.RegionPlan;
054import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
058import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
059import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * The base class for load balancers. It provides the the functions used to by
068 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
069 * in the edge cases. It doesn't provide an implementation of the
070 * actual balancing algorithm.
071 *
072 */
073@InterfaceAudience.Private
074public abstract class BaseLoadBalancer implements LoadBalancer {
075  protected static final int MIN_SERVER_BALANCE = 2;
076  private volatile boolean stopped = false;
077
078  private static final List<RegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0);
079
080  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
081    = load -> load.getRegionMetrics().isEmpty();
082
083  protected RegionLocationFinder regionFinder;
084  protected boolean useRegionFinder;
085
086  private static class DefaultRackManager extends RackManager {
087    @Override
088    public String getRack(ServerName server) {
089      return UNKNOWN_RACK;
090    }
091  }
092
093  /**
094   * The constructor that uses the basic MetricsBalancer
095   */
096  protected BaseLoadBalancer() {
097    metricsBalancer = new MetricsBalancer();
098    createRegionFinder();
099  }
100
101  /**
102   * This Constructor accepts an instance of MetricsBalancer,
103   * which will be used instead of creating a new one
104   */
105  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
106    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
107    createRegionFinder();
108  }
109
110  private void createRegionFinder() {
111    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
112    if (useRegionFinder) {
113      regionFinder = new RegionLocationFinder();
114    }
115  }
116
117  /**
118   * An efficient array based implementation similar to ClusterState for keeping
119   * the status of the cluster in terms of region assignment and distribution.
120   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
121   * hundreds of thousands of hashmap manipulations are very costly, which is why this
122   * class uses mostly indexes and arrays.
123   *
124   * Cluster tracks a list of unassigned regions, region assignments, and the server
125   * topology in terms of server names, hostnames and racks.
126   */
127  protected static class Cluster {
128    ServerName[] servers;
129    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
130    String[] racks;
131    boolean multiServersPerHost = false; // whether or not any host has more than one server
132
133    ArrayList<String> tables;
134    RegionInfo[] regions;
135    Deque<BalancerRegionLoad>[] regionLoads;
136    private RegionLocationFinder regionFinder;
137
138    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
139
140    int[]   serverIndexToHostIndex;      //serverIndex -> host index
141    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
142
143    int[][] regionsPerServer;            //serverIndex -> region list
144    int[][] regionsPerHost;              //hostIndex -> list of regions
145    int[][] regionsPerRack;              //rackIndex -> region list
146    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
147    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
148    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
149
150    int[][] serversPerHost;              //hostIndex -> list of server indexes
151    int[][] serversPerRack;              //rackIndex -> list of server indexes
152    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
153    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
154    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
155    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
156    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
157    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
158    boolean hasRegionReplicas = false;   //whether there is regions with replicas
159
160    Integer[] serverIndicesSortedByRegionCount;
161    Integer[] serverIndicesSortedByLocality;
162
163    Map<String, Integer> serversToIndex;
164    Map<String, Integer> hostsToIndex;
165    Map<String, Integer> racksToIndex;
166    Map<String, Integer> tablesToIndex;
167    Map<RegionInfo, Integer> regionsToIndex;
168    float[] localityPerServer;
169
170    int numServers;
171    int numHosts;
172    int numRacks;
173    int numTables;
174    int numRegions;
175
176    int numMovedRegions = 0; //num moved regions from the initial configuration
177    Map<ServerName, List<RegionInfo>> clusterState;
178
179    protected final RackManager rackManager;
180    // Maps region -> rackIndex -> locality of region on rack
181    private float[][] rackLocalities;
182    // Maps localityType -> region -> [server|rack]Index with highest locality
183    private int[][] regionsToMostLocalEntities;
184
185    protected Cluster(
186        Map<ServerName, List<RegionInfo>> clusterState,
187        Map<String, Deque<BalancerRegionLoad>> loads,
188        RegionLocationFinder regionFinder,
189        RackManager rackManager) {
190      this(null, clusterState, loads, regionFinder, rackManager);
191    }
192
193    @SuppressWarnings("unchecked")
194    protected Cluster(
195        Collection<RegionInfo> unassignedRegions,
196        Map<ServerName, List<RegionInfo>> clusterState,
197        Map<String, Deque<BalancerRegionLoad>> loads,
198        RegionLocationFinder regionFinder,
199        RackManager rackManager) {
200
201      if (unassignedRegions == null) {
202        unassignedRegions = EMPTY_REGION_LIST;
203      }
204
205      serversToIndex = new HashMap<>();
206      hostsToIndex = new HashMap<>();
207      racksToIndex = new HashMap<>();
208      tablesToIndex = new HashMap<>();
209
210      //TODO: We should get the list of tables from master
211      tables = new ArrayList<>();
212      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
213
214      numRegions = 0;
215
216      List<List<Integer>> serversPerHostList = new ArrayList<>();
217      List<List<Integer>> serversPerRackList = new ArrayList<>();
218      this.clusterState = clusterState;
219      this.regionFinder = regionFinder;
220
221      // Use servername and port as there can be dead servers in this list. We want everything with
222      // a matching hostname and port to have the same index.
223      for (ServerName sn : clusterState.keySet()) {
224        if (sn == null) {
225          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
226              "skipping; unassigned regions?");
227          if (LOG.isTraceEnabled()) {
228            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
229          }
230          continue;
231        }
232        if (serversToIndex.get(sn.getAddress().toString()) == null) {
233          serversToIndex.put(sn.getHostAndPort(), numServers++);
234        }
235        if (!hostsToIndex.containsKey(sn.getHostname())) {
236          hostsToIndex.put(sn.getHostname(), numHosts++);
237          serversPerHostList.add(new ArrayList<>(1));
238        }
239
240        int serverIndex = serversToIndex.get(sn.getHostAndPort());
241        int hostIndex = hostsToIndex.get(sn.getHostname());
242        serversPerHostList.get(hostIndex).add(serverIndex);
243
244        String rack = this.rackManager.getRack(sn);
245        if (!racksToIndex.containsKey(rack)) {
246          racksToIndex.put(rack, numRacks++);
247          serversPerRackList.add(new ArrayList<>());
248        }
249        int rackIndex = racksToIndex.get(rack);
250        serversPerRackList.get(rackIndex).add(serverIndex);
251      }
252
253      // Count how many regions there are.
254      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
255        numRegions += entry.getValue().size();
256      }
257      numRegions += unassignedRegions.size();
258
259      regionsToIndex = new HashMap<>(numRegions);
260      servers = new ServerName[numServers];
261      serversPerHost = new int[numHosts][];
262      serversPerRack = new int[numRacks][];
263      regions = new RegionInfo[numRegions];
264      regionIndexToServerIndex = new int[numRegions];
265      initialRegionIndexToServerIndex = new int[numRegions];
266      regionIndexToTableIndex = new int[numRegions];
267      regionIndexToPrimaryIndex = new int[numRegions];
268      regionLoads = new Deque[numRegions];
269
270      regionLocations = new int[numRegions][];
271      serverIndicesSortedByRegionCount = new Integer[numServers];
272      serverIndicesSortedByLocality = new Integer[numServers];
273      localityPerServer = new float[numServers];
274
275      serverIndexToHostIndex = new int[numServers];
276      serverIndexToRackIndex = new int[numServers];
277      regionsPerServer = new int[numServers][];
278      regionsPerHost = new int[numHosts][];
279      regionsPerRack = new int[numRacks][];
280      primariesOfRegionsPerServer = new int[numServers][];
281      primariesOfRegionsPerHost = new int[numHosts][];
282      primariesOfRegionsPerRack = new int[numRacks][];
283
284      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
285
286      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
287        if (entry.getKey() == null) {
288          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
289          continue;
290        }
291        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
292
293        // keep the servername if this is the first server name for this hostname
294        // or this servername has the newest startcode.
295        if (servers[serverIndex] == null ||
296            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
297          servers[serverIndex] = entry.getKey();
298        }
299
300        if (regionsPerServer[serverIndex] != null) {
301          // there is another server with the same hostAndPort in ClusterState.
302          // allocate the array for the total size
303          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
304        } else {
305          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
306        }
307        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
308        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
309        serverIndicesSortedByLocality[serverIndex] = serverIndex;
310      }
311
312      hosts = new String[numHosts];
313      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
314        hosts[entry.getValue()] = entry.getKey();
315      }
316      racks = new String[numRacks];
317      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
318        racks[entry.getValue()] = entry.getKey();
319      }
320
321      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
322        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
323        regionPerServerIndex = 0;
324
325        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
326        serverIndexToHostIndex[serverIndex] = hostIndex;
327
328        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
329        serverIndexToRackIndex[serverIndex] = rackIndex;
330
331        for (RegionInfo region : entry.getValue()) {
332          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
333          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
334          regionIndex++;
335        }
336      }
337
338      for (RegionInfo region : unassignedRegions) {
339        registerRegion(region, regionIndex, -1, loads, regionFinder);
340        regionIndex++;
341      }
342
343      for (int i = 0; i < serversPerHostList.size(); i++) {
344        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
345        for (int j = 0; j < serversPerHost[i].length; j++) {
346          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
347        }
348        if (serversPerHost[i].length > 1) {
349          multiServersPerHost = true;
350        }
351      }
352
353      for (int i = 0; i < serversPerRackList.size(); i++) {
354        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
355        for (int j = 0; j < serversPerRack[i].length; j++) {
356          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
357        }
358      }
359
360      numTables = tables.size();
361      numRegionsPerServerPerTable = new int[numServers][numTables];
362
363      for (int i = 0; i < numServers; i++) {
364        for (int j = 0; j < numTables; j++) {
365          numRegionsPerServerPerTable[i][j] = 0;
366        }
367      }
368
369      for (int i=0; i < regionIndexToServerIndex.length; i++) {
370        if (regionIndexToServerIndex[i] >= 0) {
371          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
372        }
373      }
374
375      numMaxRegionsPerTable = new int[numTables];
376      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
377        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
378          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
379            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
380          }
381        }
382      }
383
384      for (int i = 0; i < regions.length; i ++) {
385        RegionInfo info = regions[i];
386        if (RegionReplicaUtil.isDefaultReplica(info)) {
387          regionIndexToPrimaryIndex[i] = i;
388        } else {
389          hasRegionReplicas = true;
390          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
391          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
392        }
393      }
394
395      for (int i = 0; i < regionsPerServer.length; i++) {
396        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
397        for (int j = 0; j < regionsPerServer[i].length; j++) {
398          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
399          primariesOfRegionsPerServer[i][j] = primaryIndex;
400        }
401        // sort the regions by primaries.
402        Arrays.sort(primariesOfRegionsPerServer[i]);
403      }
404
405      // compute regionsPerHost
406      if (multiServersPerHost) {
407        for (int i = 0 ; i < serversPerHost.length; i++) {
408          int numRegionsPerHost = 0;
409          for (int j = 0; j < serversPerHost[i].length; j++) {
410            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
411          }
412          regionsPerHost[i] = new int[numRegionsPerHost];
413          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
414        }
415        for (int i = 0 ; i < serversPerHost.length; i++) {
416          int numRegionPerHostIndex = 0;
417          for (int j = 0; j < serversPerHost[i].length; j++) {
418            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
419              int region = regionsPerServer[serversPerHost[i][j]][k];
420              regionsPerHost[i][numRegionPerHostIndex] = region;
421              int primaryIndex = regionIndexToPrimaryIndex[region];
422              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
423              numRegionPerHostIndex++;
424            }
425          }
426          // sort the regions by primaries.
427          Arrays.sort(primariesOfRegionsPerHost[i]);
428        }
429      }
430
431      // compute regionsPerRack
432      if (numRacks > 1) {
433        for (int i = 0 ; i < serversPerRack.length; i++) {
434          int numRegionsPerRack = 0;
435          for (int j = 0; j < serversPerRack[i].length; j++) {
436            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
437          }
438          regionsPerRack[i] = new int[numRegionsPerRack];
439          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
440        }
441
442        for (int i = 0 ; i < serversPerRack.length; i++) {
443          int numRegionPerRackIndex = 0;
444          for (int j = 0; j < serversPerRack[i].length; j++) {
445            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
446              int region = regionsPerServer[serversPerRack[i][j]][k];
447              regionsPerRack[i][numRegionPerRackIndex] = region;
448              int primaryIndex = regionIndexToPrimaryIndex[region];
449              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
450              numRegionPerRackIndex++;
451            }
452          }
453          // sort the regions by primaries.
454          Arrays.sort(primariesOfRegionsPerRack[i]);
455        }
456      }
457    }
458
459    /** Helper for Cluster constructor to handle a region */
460    private void registerRegion(RegionInfo region, int regionIndex,
461        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
462        RegionLocationFinder regionFinder) {
463      String tableName = region.getTable().getNameAsString();
464      if (!tablesToIndex.containsKey(tableName)) {
465        tables.add(tableName);
466        tablesToIndex.put(tableName, tablesToIndex.size());
467      }
468      int tableIndex = tablesToIndex.get(tableName);
469
470      regionsToIndex.put(region, regionIndex);
471      regions[regionIndex] = region;
472      regionIndexToServerIndex[regionIndex] = serverIndex;
473      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
474      regionIndexToTableIndex[regionIndex] = tableIndex;
475
476      // region load
477      if (loads != null) {
478        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
479        // That could have failed if the RegionLoad is using the other regionName
480        if (rl == null) {
481          // Try getting the region load using encoded name.
482          rl = loads.get(region.getEncodedName());
483        }
484        regionLoads[regionIndex] = rl;
485      }
486
487      if (regionFinder != null) {
488        // region location
489        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
490        regionLocations[regionIndex] = new int[loc.size()];
491        for (int i = 0; i < loc.size(); i++) {
492          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
493              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
494                  : serversToIndex.get(loc.get(i).getHostAndPort()));
495        }
496      }
497    }
498
499    /**
500     * Returns true iff a given server has less regions than the balanced amount
501     */
502    public boolean serverHasTooFewRegions(int server) {
503      int minLoad = this.numRegions / numServers;
504      int numRegions = getNumRegions(server);
505      return numRegions < minLoad;
506    }
507
508    /**
509     * Retrieves and lazily initializes a field storing the locality of
510     * every region/server combination
511     */
512    public float[][] getOrComputeRackLocalities() {
513      if (rackLocalities == null || regionsToMostLocalEntities == null) {
514        computeCachedLocalities();
515      }
516      return rackLocalities;
517    }
518
519    /**
520     * Lazily initializes and retrieves a mapping of region -> server for which region has
521     * the highest the locality
522     */
523    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
524      if (rackLocalities == null || regionsToMostLocalEntities == null) {
525        computeCachedLocalities();
526      }
527      return regionsToMostLocalEntities[type.ordinal()];
528    }
529
530    /**
531     * Looks up locality from cache of localities. Will create cache if it does
532     * not already exist.
533     */
534    public float getOrComputeLocality(int region, int entity, LocalityType type) {
535      switch (type) {
536        case SERVER:
537          return getLocalityOfRegion(region, entity);
538        case RACK:
539          return getOrComputeRackLocalities()[region][entity];
540        default:
541          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
542      }
543    }
544
545    /**
546     * Returns locality weighted by region size in MB. Will create locality cache
547     * if it does not already exist.
548     */
549    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
550      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
551    }
552
553    /**
554     * Returns the size in MB from the most recent RegionLoad for region
555     */
556    public int getRegionSizeMB(int region) {
557      Deque<BalancerRegionLoad> load = regionLoads[region];
558      // This means regions have no actual data on disk
559      if (load == null) {
560        return 0;
561      }
562      return regionLoads[region].getLast().getStorefileSizeMB();
563    }
564
565    /**
566     * Computes and caches the locality for each region/rack combinations,
567     * as well as storing a mapping of region -> server and region -> rack such that server
568     * and rack have the highest locality for region
569     */
570    private void computeCachedLocalities() {
571      rackLocalities = new float[numRegions][numRacks];
572      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
573
574      // Compute localities and find most local server per region
575      for (int region = 0; region < numRegions; region++) {
576        int serverWithBestLocality = 0;
577        float bestLocalityForRegion = 0;
578        for (int server = 0; server < numServers; server++) {
579          // Aggregate per-rack locality
580          float locality = getLocalityOfRegion(region, server);
581          int rack = serverIndexToRackIndex[server];
582          int numServersInRack = serversPerRack[rack].length;
583          rackLocalities[region][rack] += locality / numServersInRack;
584
585          if (locality > bestLocalityForRegion) {
586            serverWithBestLocality = server;
587            bestLocalityForRegion = locality;
588          }
589        }
590        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
591
592        // Find most local rack per region
593        int rackWithBestLocality = 0;
594        float bestRackLocalityForRegion = 0.0f;
595        for (int rack = 0; rack < numRacks; rack++) {
596          float rackLocality = rackLocalities[region][rack];
597          if (rackLocality > bestRackLocalityForRegion) {
598            bestRackLocalityForRegion = rackLocality;
599            rackWithBestLocality = rack;
600          }
601        }
602        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
603      }
604
605    }
606
607    /**
608     * Maps region index to rack index
609     */
610    public int getRackForRegion(int region) {
611      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
612    }
613
614    enum LocalityType {
615      SERVER,
616      RACK
617    }
618
619    /** An action to move or swap a region */
620    public static class Action {
621      public enum Type {
622        ASSIGN_REGION,
623        MOVE_REGION,
624        SWAP_REGIONS,
625        NULL,
626      }
627
628      public Type type;
629      public Action (Type type) {this.type = type;}
630      /** Returns an Action which would undo this action */
631      public Action undoAction() { return this; }
632      @Override
633      public String toString() { return type + ":";}
634    }
635
636    public static class AssignRegionAction extends Action {
637      public int region;
638      public int server;
639      public AssignRegionAction(int region, int server) {
640        super(Type.ASSIGN_REGION);
641        this.region = region;
642        this.server = server;
643      }
644      @Override
645      public Action undoAction() {
646        // TODO implement this. This action is not being used by the StochasticLB for now
647        // in case it uses it, we should implement this function.
648        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
649      }
650      @Override
651      public String toString() {
652        return type + ": " + region + ":" + server;
653      }
654    }
655
656    public static class MoveRegionAction extends Action {
657      public int region;
658      public int fromServer;
659      public int toServer;
660
661      public MoveRegionAction(int region, int fromServer, int toServer) {
662        super(Type.MOVE_REGION);
663        this.fromServer = fromServer;
664        this.region = region;
665        this.toServer = toServer;
666      }
667      @Override
668      public Action undoAction() {
669        return new MoveRegionAction (region, toServer, fromServer);
670      }
671      @Override
672      public String toString() {
673        return type + ": " + region + ":" + fromServer + " -> " + toServer;
674      }
675    }
676
677    public static class SwapRegionsAction extends Action {
678      public int fromServer;
679      public int fromRegion;
680      public int toServer;
681      public int toRegion;
682      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
683        super(Type.SWAP_REGIONS);
684        this.fromServer = fromServer;
685        this.fromRegion = fromRegion;
686        this.toServer = toServer;
687        this.toRegion = toRegion;
688      }
689      @Override
690      public Action undoAction() {
691        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
692      }
693      @Override
694      public String toString() {
695        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
696      }
697    }
698
699    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
700        justification="Mistake. Too disruptive to change now")
701    public static final Action NullAction = new Action(Type.NULL);
702
703    public void doAction(Action action) {
704      switch (action.type) {
705      case NULL: break;
706      case ASSIGN_REGION:
707        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
708        assert action instanceof AssignRegionAction: action.getClass();
709        AssignRegionAction ar = (AssignRegionAction) action;
710        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
711        regionMoved(ar.region, -1, ar.server);
712        break;
713      case MOVE_REGION:
714        assert action instanceof MoveRegionAction: action.getClass();
715        MoveRegionAction mra = (MoveRegionAction) action;
716        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
717        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
718        regionMoved(mra.region, mra.fromServer, mra.toServer);
719        break;
720      case SWAP_REGIONS:
721        assert action instanceof SwapRegionsAction: action.getClass();
722        SwapRegionsAction a = (SwapRegionsAction) action;
723        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
724        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
725        regionMoved(a.fromRegion, a.fromServer, a.toServer);
726        regionMoved(a.toRegion, a.toServer, a.fromServer);
727        break;
728      default:
729        throw new RuntimeException("Uknown action:" + action.type);
730      }
731    }
732
733    /**
734     * Return true if the placement of region on server would lower the availability
735     * of the region in question
736     * @return true or false
737     */
738    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
739      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
740        return false; // safeguard against race between cluster.servers and servers from LB method args
741      }
742      int server = serversToIndex.get(serverName.getHostAndPort());
743      int region = regionsToIndex.get(regionInfo);
744
745      int primary = regionIndexToPrimaryIndex[region];
746      // there is a subset relation for server < host < rack
747      // check server first
748
749      if (contains(primariesOfRegionsPerServer[server], primary)) {
750        // check for whether there are other servers that we can place this region
751        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
752          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
753            return true; // meaning there is a better server
754          }
755        }
756        return false; // there is not a better server to place this
757      }
758
759      // check host
760      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
761        int host = serverIndexToHostIndex[server];
762        if (contains(primariesOfRegionsPerHost[host], primary)) {
763          // check for whether there are other hosts that we can place this region
764          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
765            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
766              return true; // meaning there is a better host
767            }
768          }
769          return false; // there is not a better host to place this
770        }
771      }
772
773      // check rack
774      if (numRacks > 1) {
775        int rack = serverIndexToRackIndex[server];
776        if (contains(primariesOfRegionsPerRack[rack], primary)) {
777          // check for whether there are other racks that we can place this region
778          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
779            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
780              return true; // meaning there is a better rack
781            }
782          }
783          return false; // there is not a better rack to place this
784        }
785      }
786      return false;
787    }
788
789    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
790      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
791        return;
792      }
793      int server = serversToIndex.get(serverName.getHostAndPort());
794      int region = regionsToIndex.get(regionInfo);
795      doAction(new AssignRegionAction(region, server));
796    }
797
798    void regionMoved(int region, int oldServer, int newServer) {
799      regionIndexToServerIndex[region] = newServer;
800      if (initialRegionIndexToServerIndex[region] == newServer) {
801        numMovedRegions--; //region moved back to original location
802      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
803        numMovedRegions++; //region moved from original location
804      }
805      int tableIndex = regionIndexToTableIndex[region];
806      if (oldServer >= 0) {
807        numRegionsPerServerPerTable[oldServer][tableIndex]--;
808      }
809      numRegionsPerServerPerTable[newServer][tableIndex]++;
810
811      //check whether this caused maxRegionsPerTable in the new Server to be updated
812      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
813        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
814      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
815          == numMaxRegionsPerTable[tableIndex]) {
816        //recompute maxRegionsPerTable since the previous value was coming from the old server
817        numMaxRegionsPerTable[tableIndex] = 0;
818        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
819          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
820            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
821          }
822        }
823      }
824
825      // update for servers
826      int primary = regionIndexToPrimaryIndex[region];
827      if (oldServer >= 0) {
828        primariesOfRegionsPerServer[oldServer] = removeRegion(
829          primariesOfRegionsPerServer[oldServer], primary);
830      }
831      primariesOfRegionsPerServer[newServer] = addRegionSorted(
832        primariesOfRegionsPerServer[newServer], primary);
833
834      // update for hosts
835      if (multiServersPerHost) {
836        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
837        int newHost = serverIndexToHostIndex[newServer];
838        if (newHost != oldHost) {
839          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
840          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
841          if (oldHost >= 0) {
842            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
843            primariesOfRegionsPerHost[oldHost] = removeRegion(
844              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
845          }
846        }
847      }
848
849      // update for racks
850      if (numRacks > 1) {
851        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
852        int newRack = serverIndexToRackIndex[newServer];
853        if (newRack != oldRack) {
854          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
855          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
856          if (oldRack >= 0) {
857            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
858            primariesOfRegionsPerRack[oldRack] = removeRegion(
859              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
860          }
861        }
862      }
863    }
864
865    int[] removeRegion(int[] regions, int regionIndex) {
866      //TODO: this maybe costly. Consider using linked lists
867      int[] newRegions = new int[regions.length - 1];
868      int i = 0;
869      for (i = 0; i < regions.length; i++) {
870        if (regions[i] == regionIndex) {
871          break;
872        }
873        newRegions[i] = regions[i];
874      }
875      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
876      return newRegions;
877    }
878
879    int[] addRegion(int[] regions, int regionIndex) {
880      int[] newRegions = new int[regions.length + 1];
881      System.arraycopy(regions, 0, newRegions, 0, regions.length);
882      newRegions[newRegions.length - 1] = regionIndex;
883      return newRegions;
884    }
885
886    int[] addRegionSorted(int[] regions, int regionIndex) {
887      int[] newRegions = new int[regions.length + 1];
888      int i = 0;
889      for (i = 0; i < regions.length; i++) { // find the index to insert
890        if (regions[i] > regionIndex) {
891          break;
892        }
893      }
894      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
895      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
896      newRegions[i] = regionIndex;
897
898      return newRegions;
899    }
900
901    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
902      int i = 0;
903      for (i = 0; i < regions.length; i++) {
904        if (regions[i] == regionIndex) {
905          regions[i] = newRegionIndex;
906          break;
907        }
908      }
909      return regions;
910    }
911
912    void sortServersByRegionCount() {
913      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
914    }
915
916    int getNumRegions(int server) {
917      return regionsPerServer[server].length;
918    }
919
920    boolean contains(int[] arr, int val) {
921      return Arrays.binarySearch(arr, val) >= 0;
922    }
923
924    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
925
926    int getLowestLocalityRegionOnServer(int serverIndex) {
927      if (regionFinder != null) {
928        float lowestLocality = 1.0f;
929        int lowestLocalityRegionIndex = -1;
930        if (regionsPerServer[serverIndex].length == 0) {
931          // No regions on that region server
932          return -1;
933        }
934        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
935          int regionIndex = regionsPerServer[serverIndex][j];
936          HDFSBlocksDistribution distribution = regionFinder
937              .getBlockDistribution(regions[regionIndex]);
938          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
939          // skip empty region
940          if (distribution.getUniqueBlocksTotalWeight() == 0) {
941            continue;
942          }
943          if (locality < lowestLocality) {
944            lowestLocality = locality;
945            lowestLocalityRegionIndex = j;
946          }
947        }
948        if (lowestLocalityRegionIndex == -1) {
949          return -1;
950        }
951        if (LOG.isTraceEnabled()) {
952          LOG.trace("Lowest locality region is "
953              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
954                  .getRegionNameAsString() + " with locality " + lowestLocality
955              + " and its region server contains " + regionsPerServer[serverIndex].length
956              + " regions");
957        }
958        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
959      } else {
960        return -1;
961      }
962    }
963
964    float getLocalityOfRegion(int region, int server) {
965      if (regionFinder != null) {
966        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
967        return distribution.getBlockLocalityIndex(servers[server].getHostname());
968      } else {
969        return 0f;
970      }
971    }
972
973    @VisibleForTesting
974    protected void setNumRegions(int numRegions) {
975      this.numRegions = numRegions;
976    }
977
978    @VisibleForTesting
979    protected void setNumMovedRegions(int numMovedRegions) {
980      this.numMovedRegions = numMovedRegions;
981    }
982
983    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
984        justification="Not important but should be fixed")
985    @Override
986    public String toString() {
987      StringBuilder desc = new StringBuilder("Cluster={servers=[");
988      for(ServerName sn:servers) {
989        desc.append(sn.getHostAndPort()).append(", ");
990      }
991      desc.append("], serverIndicesSortedByRegionCount=")
992          .append(Arrays.toString(serverIndicesSortedByRegionCount))
993          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
994
995      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
996          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
997          .append(", numTables=").append(numTables).append(", numMovedRegions=")
998          .append(numMovedRegions).append('}');
999      return desc.toString();
1000    }
1001  }
1002
1003  // slop for regions
1004  protected float slop;
1005  // overallSlop to control simpleLoadBalancer's cluster level threshold
1006  protected float overallSlop;
1007  protected Configuration config = HBaseConfiguration.create();
1008  protected RackManager rackManager;
1009  private static final Random RANDOM = new Random(System.currentTimeMillis());
1010  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1011  protected MetricsBalancer metricsBalancer = null;
1012  protected ClusterMetrics clusterStatus = null;
1013  protected ServerName masterServerName;
1014  protected MasterServices services;
1015  protected boolean onlySystemTablesOnMaster;
1016  protected boolean maintenanceMode;
1017
1018  @Override
1019  public void setConf(Configuration conf) {
1020    this.config = conf;
1021    setSlop(conf);
1022    if (slop < 0) slop = 0;
1023    else if (slop > 1) slop = 1;
1024
1025    if (overallSlop < 0) overallSlop = 0;
1026    else if (overallSlop > 1) overallSlop = 1;
1027
1028    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1029
1030    this.rackManager = new RackManager(getConf());
1031    if (useRegionFinder) {
1032      regionFinder.setConf(conf);
1033    }
1034    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1035    LOG.info("slop={}, systemTablesOnMaster={}",
1036        this.slop, this.onlySystemTablesOnMaster);
1037  }
1038
1039  protected void setSlop(Configuration conf) {
1040    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1041    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1042  }
1043
1044  /**
1045   * Check if a region belongs to some system table.
1046   * If so, the primary replica may be expected to be put on the master regionserver.
1047   */
1048  public boolean shouldBeOnMaster(RegionInfo region) {
1049    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1050        && region.getTable().isSystemTable();
1051  }
1052
1053  /**
1054   * Balance the regions that should be on master regionserver.
1055   */
1056  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1057    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1058    List<RegionPlan> plans = null;
1059    List<RegionInfo> regions = clusterMap.get(masterServerName);
1060    if (regions != null) {
1061      Iterator<ServerName> keyIt = null;
1062      for (RegionInfo region: regions) {
1063        if (shouldBeOnMaster(region)) continue;
1064
1065        // Find a non-master regionserver to host the region
1066        if (keyIt == null || !keyIt.hasNext()) {
1067          keyIt = clusterMap.keySet().iterator();
1068        }
1069        ServerName dest = keyIt.next();
1070        if (masterServerName.equals(dest)) {
1071          if (!keyIt.hasNext()) {
1072            keyIt = clusterMap.keySet().iterator();
1073          }
1074          dest = keyIt.next();
1075        }
1076
1077        // Move this region away from the master regionserver
1078        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1079        if (plans == null) {
1080          plans = new ArrayList<>();
1081        }
1082        plans.add(plan);
1083      }
1084    }
1085    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1086      if (masterServerName.equals(server.getKey())) continue;
1087      for (RegionInfo region: server.getValue()) {
1088        if (!shouldBeOnMaster(region)) continue;
1089
1090        // Move this region to the master regionserver
1091        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1092        if (plans == null) {
1093          plans = new ArrayList<>();
1094        }
1095        plans.add(plan);
1096      }
1097    }
1098    return plans;
1099  }
1100
1101  /**
1102   * If master is configured to carry system tables only, in here is
1103   * where we figure what to assign it.
1104   */
1105  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1106      Collection<RegionInfo> regions, List<ServerName> servers) {
1107    if (servers == null || regions == null || regions.isEmpty()) {
1108      return null;
1109    }
1110    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1111    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1112      if (masterServerName != null && servers.contains(masterServerName)) {
1113        assignments.put(masterServerName, new ArrayList<>());
1114        for (RegionInfo region : regions) {
1115          if (shouldBeOnMaster(region)) {
1116            assignments.get(masterServerName).add(region);
1117          }
1118        }
1119      }
1120    }
1121    return assignments;
1122  }
1123
1124  @Override
1125  public Configuration getConf() {
1126    return this.config;
1127  }
1128
1129  @Override
1130  public synchronized void setClusterMetrics(ClusterMetrics st) {
1131    this.clusterStatus = st;
1132    if (useRegionFinder) {
1133      regionFinder.setClusterMetrics(st);
1134    }
1135  }
1136
1137  @Override
1138  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
1139
1140  }
1141
1142  @Override
1143  public void setMasterServices(MasterServices masterServices) {
1144    masterServerName = masterServices.getServerName();
1145    this.services = masterServices;
1146    if (useRegionFinder) {
1147      this.regionFinder.setServices(masterServices);
1148    }
1149    if (this.services.isInMaintenanceMode()) {
1150      this.maintenanceMode = true;
1151    }
1152  }
1153
1154  @Override
1155  public void postMasterStartupInitialize() {
1156    if (services != null && regionFinder != null) {
1157      try {
1158        Set<RegionInfo> regions =
1159            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1160        regionFinder.refreshAndWait(regions);
1161      } catch (Exception e) {
1162        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1163      }
1164    }
1165  }
1166
1167  public void setRackManager(RackManager rackManager) {
1168    this.rackManager = rackManager;
1169  }
1170
1171  protected boolean needsBalance(Cluster c) {
1172    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1173    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1174      if (LOG.isDebugEnabled()) {
1175        LOG.debug("Not running balancer because only " + cs.getNumServers()
1176            + " active regionserver(s)");
1177      }
1178      return false;
1179    }
1180    if(areSomeRegionReplicasColocated(c)) return true;
1181    // Check if we even need to do any load balancing
1182    // HBASE-3681 check sloppiness first
1183    float average = cs.getLoadAverage(); // for logging
1184    int floor = (int) Math.floor(average * (1 - slop));
1185    int ceiling = (int) Math.ceil(average * (1 + slop));
1186    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1187      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1188      if (LOG.isTraceEnabled()) {
1189        // If nothing to balance, then don't say anything unless trace-level logging.
1190        LOG.trace("Skipping load balancing because balanced cluster; " +
1191          "servers=" + cs.getNumServers() +
1192          " regions=" + cs.getNumRegions() + " average=" + average +
1193          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1194          " leastloaded=" + serversByLoad.firstKey().getLoad());
1195      }
1196      return false;
1197    }
1198    return true;
1199  }
1200
1201  /**
1202   * Subclasses should implement this to return true if the cluster has nodes that hosts
1203   * multiple replicas for the same region, or, if there are multiple racks and the same
1204   * rack hosts replicas of the same region
1205   * @param c Cluster information
1206   * @return whether region replicas are currently co-located
1207   */
1208  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1209    return false;
1210  }
1211
1212  /**
1213   * Generates a bulk assignment plan to be used on cluster startup using a
1214   * simple round-robin assignment.
1215   * <p>
1216   * Takes a list of all the regions and all the servers in the cluster and
1217   * returns a map of each server to the regions that it should be assigned.
1218   * <p>
1219   * Currently implemented as a round-robin assignment. Same invariant as load
1220   * balancing, all servers holding floor(avg) or ceiling(avg).
1221   *
1222   * TODO: Use block locations from HDFS to place regions with their blocks
1223   *
1224   * @param regions all regions
1225   * @param servers all servers
1226   * @return map of server to the regions it should take, or null if no
1227   *         assignment is possible (ie. no regions or no servers)
1228   */
1229  @Override
1230  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1231      List<ServerName> servers) throws HBaseIOException {
1232    metricsBalancer.incrMiscInvocations();
1233    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1234    if (assignments != null && !assignments.isEmpty()) {
1235      servers = new ArrayList<>(servers);
1236      // Guarantee not to put other regions on master
1237      servers.remove(masterServerName);
1238      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1239      if (!masterRegions.isEmpty()) {
1240        regions = new ArrayList<>(regions);
1241        regions.removeAll(masterRegions);
1242      }
1243    }
1244    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1245      return assignments;
1246    }
1247
1248    int numServers = servers == null ? 0 : servers.size();
1249    if (numServers == 0) {
1250      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1251      return null;
1252    }
1253
1254    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1255    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1256    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1257    // and balanced. This should also run fast with fewer number of iterations.
1258
1259    if (numServers == 1) { // Only one server, nothing fancy we can do here
1260      ServerName server = servers.get(0);
1261      assignments.put(server, new ArrayList<>(regions));
1262      return assignments;
1263    }
1264
1265    Cluster cluster = createCluster(servers, regions, false);
1266    List<RegionInfo> unassignedRegions = new ArrayList<>();
1267
1268    roundRobinAssignment(cluster, regions, unassignedRegions,
1269      servers, assignments);
1270
1271    List<RegionInfo> lastFewRegions = new ArrayList<>();
1272    // assign the remaining by going through the list and try to assign to servers one-by-one
1273    int serverIdx = RANDOM.nextInt(numServers);
1274    OUTER : for (RegionInfo region : unassignedRegions) {
1275      boolean assigned = false;
1276      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1277        ServerName serverName = servers.get((j + serverIdx) % numServers);
1278        if (!cluster.wouldLowerAvailability(region, serverName)) {
1279          List<RegionInfo> serverRegions =
1280              assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
1281          if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) {
1282            // if the region is not a default replica
1283            // check if the assignments map has the other replica region on this server
1284            for (RegionInfo hri : serverRegions) {
1285              if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) {
1286                if (LOG.isTraceEnabled()) {
1287                  LOG.trace("Skipping the server, " + serverName
1288                      + " , got the same server for the region " + region);
1289                }
1290                // do not allow this case. The unassignedRegions we got because the
1291                // replica region in this list was not assigned because of lower availablity issue.
1292                // So when we assign here we should ensure that as far as possible the server being
1293                // selected does not have the server where the replica region was not assigned.
1294                continue INNER; // continue the inner loop, ie go to the next server
1295              }
1296            }
1297          }
1298          serverRegions.add(region);
1299          cluster.doAssignRegion(region, serverName);
1300          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1301          assigned = true;
1302          break;
1303        }
1304      }
1305      if (!assigned) {
1306        lastFewRegions.add(region);
1307      }
1308    }
1309    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1310    // make it optimal later. we can end up with this if numReplicas > numServers.
1311    for (RegionInfo region : lastFewRegions) {
1312      int i = RANDOM.nextInt(numServers);
1313      ServerName server = servers.get(i);
1314      List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>());
1315      serverRegions.add(region);
1316      cluster.doAssignRegion(region, server);
1317    }
1318    return assignments;
1319  }
1320
1321  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
1322      boolean hasRegionReplica) {
1323    // Get the snapshot of the current assignments for the regions in question, and then create
1324    // a cluster out of it. Note that we might have replicas already assigned to some servers
1325    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1326    // replicas of the regions that are passed (for performance).
1327    Map<ServerName, List<RegionInfo>> clusterState = null;
1328    if (!hasRegionReplica) {
1329      clusterState = getRegionAssignmentsByServer(regions);
1330    } else {
1331      // for the case where we have region replica it is better we get the entire cluster's snapshot
1332      clusterState = getRegionAssignmentsByServer(null);
1333    }
1334
1335    for (ServerName server : servers) {
1336      if (!clusterState.containsKey(server)) {
1337        clusterState.put(server, EMPTY_REGION_LIST);
1338      }
1339    }
1340    return new Cluster(regions, clusterState, null, this.regionFinder,
1341        rackManager);
1342  }
1343
1344  private List<ServerName> findIdleServers(List<ServerName> servers) {
1345    return this.services.getServerManager()
1346            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1347  }
1348
1349  /**
1350   * Used to assign a single region to a random server.
1351   */
1352  @Override
1353  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1354      throws HBaseIOException {
1355    metricsBalancer.incrMiscInvocations();
1356    if (servers != null && servers.contains(masterServerName)) {
1357      if (shouldBeOnMaster(regionInfo)) {
1358        return masterServerName;
1359      }
1360      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1361        // Guarantee we do not put any regions on master
1362        servers = new ArrayList<>(servers);
1363        servers.remove(masterServerName);
1364      }
1365    }
1366
1367    int numServers = servers == null ? 0 : servers.size();
1368    if (numServers == 0) {
1369      LOG.warn("Wanted to retain assignment but no servers to assign to");
1370      return null;
1371    }
1372    if (numServers == 1) { // Only one server, nothing fancy we can do here
1373      return servers.get(0);
1374    }
1375    List<ServerName> idleServers = findIdleServers(servers);
1376    if (idleServers.size() == 1) {
1377      return idleServers.get(0);
1378    }
1379    final List<ServerName> finalServers = idleServers.isEmpty() ?
1380            servers : idleServers;
1381    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1382    Cluster cluster = createCluster(finalServers, regions, false);
1383    return randomAssignment(cluster, regionInfo, finalServers);
1384  }
1385
1386  /**
1387   * Generates a bulk assignment startup plan, attempting to reuse the existing
1388   * assignment information from META, but adjusting for the specified list of
1389   * available/online servers available for assignment.
1390   * <p>
1391   * Takes a map of all regions to their existing assignment from META. Also
1392   * takes a list of online servers for regions to be assigned to. Attempts to
1393   * retain all assignment, so in some instances initial assignment will not be
1394   * completely balanced.
1395   * <p>
1396   * Any leftover regions without an existing server to be assigned to will be
1397   * assigned randomly to available servers.
1398   *
1399   * @param regions regions and existing assignment from meta
1400   * @param servers available servers
1401   * @return map of servers and regions to be assigned to them
1402   */
1403  @Override
1404  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1405      List<ServerName> servers) throws HBaseIOException {
1406    // Update metrics
1407    metricsBalancer.incrMiscInvocations();
1408    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1409    if (assignments != null && !assignments.isEmpty()) {
1410      servers = new ArrayList<>(servers);
1411      // Guarantee not to put other regions on master
1412      servers.remove(masterServerName);
1413      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1414      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1415          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1416    }
1417    if (this.maintenanceMode || regions.isEmpty()) {
1418      return assignments;
1419    }
1420
1421    int numServers = servers == null ? 0 : servers.size();
1422    if (numServers == 0) {
1423      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1424      return null;
1425    }
1426    if (numServers == 1) { // Only one server, nothing fancy we can do here
1427      ServerName server = servers.get(0);
1428      assignments.put(server, new ArrayList<>(regions.keySet()));
1429      return assignments;
1430    }
1431
1432    // Group all of the old assignments by their hostname.
1433    // We can't group directly by ServerName since the servers all have
1434    // new start-codes.
1435
1436    // Group the servers by their hostname. It's possible we have multiple
1437    // servers on the same host on different ports.
1438    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1439    for (ServerName server : servers) {
1440      assignments.put(server, new ArrayList<>());
1441      serversByHostname.put(server.getHostnameLowerCase(), server);
1442    }
1443
1444    // Collection of the hostnames that used to have regions
1445    // assigned, but for which we no longer have any RS running
1446    // after the cluster restart.
1447    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1448
1449    // If the old servers aren't present, lets assign those regions later.
1450    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1451
1452    int numRandomAssignments = 0;
1453    int numRetainedAssigments = 0;
1454    boolean hasRegionReplica = false;
1455    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1456      RegionInfo region = entry.getKey();
1457      ServerName oldServerName = entry.getValue();
1458      // In the current set of regions even if one has region replica let us go with
1459      // getting the entire snapshot
1460      if (this.services != null) { // for tests
1461        AssignmentManager am = this.services.getAssignmentManager();
1462        if (am != null) {
1463          RegionStates rss = am.getRegionStates();
1464          if (!hasRegionReplica && rss.isReplicaAvailableForRegion(region)) {
1465            hasRegionReplica = true;
1466          }
1467        }
1468      }
1469      List<ServerName> localServers = new ArrayList<>();
1470      if (oldServerName != null) {
1471        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1472      }
1473      if (localServers.isEmpty()) {
1474        // No servers on the new cluster match up with this hostname, assign randomly, later.
1475        randomAssignRegions.add(region);
1476        if (oldServerName != null) {
1477          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1478        }
1479      } else if (localServers.size() == 1) {
1480        // the usual case - one new server on same host
1481        ServerName target = localServers.get(0);
1482        assignments.get(target).add(region);
1483        numRetainedAssigments++;
1484      } else {
1485        // multiple new servers in the cluster on this same host
1486        if (localServers.contains(oldServerName)) {
1487          assignments.get(oldServerName).add(region);
1488          numRetainedAssigments++;
1489        } else {
1490          ServerName target = null;
1491          for (ServerName tmp : localServers) {
1492            if (tmp.getPort() == oldServerName.getPort()) {
1493              target = tmp;
1494              assignments.get(tmp).add(region);
1495              numRetainedAssigments++;
1496              break;
1497            }
1498          }
1499          if (target == null) {
1500            randomAssignRegions.add(region);
1501          }
1502        }
1503      }
1504    }
1505
1506    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1507    if (randomAssignRegions.size() > 0) {
1508      Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
1509      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1510        ServerName sn = entry.getKey();
1511        for (RegionInfo region : entry.getValue()) {
1512          cluster.doAssignRegion(region, sn);
1513        }
1514      }
1515      for (RegionInfo region : randomAssignRegions) {
1516        ServerName target = randomAssignment(cluster, region, servers);
1517        assignments.get(target).add(region);
1518        numRandomAssignments++;
1519      }
1520    }
1521
1522    String randomAssignMsg = "";
1523    if (numRandomAssignments > 0) {
1524      randomAssignMsg =
1525          numRandomAssignments + " regions were assigned "
1526              + "to random hosts, since the old hosts for these regions are no "
1527              + "longer present in the cluster. These hosts were:\n  "
1528              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1529    }
1530
1531    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1532        + " retained the pre-restart assignment. " + randomAssignMsg);
1533    return assignments;
1534  }
1535
1536  @Override
1537  public void initialize() throws HBaseIOException{
1538  }
1539
1540  @Override
1541  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1542  }
1543
1544  @Override
1545  public void regionOffline(RegionInfo regionInfo) {
1546  }
1547
1548  @Override
1549  public boolean isStopped() {
1550    return stopped;
1551  }
1552
1553  @Override
1554  public void stop(String why) {
1555    LOG.info("Load Balancer stop requested: "+why);
1556    stopped = true;
1557  }
1558
1559  /**
1560  * Updates the balancer status tag reported to JMX
1561  */
1562  public void updateBalancerStatus(boolean status) {
1563    metricsBalancer.balancerStatus(status);
1564  }
1565
1566  /**
1567   * Used to assign a single region to a random server.
1568   */
1569  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1570      List<ServerName> servers) {
1571    int numServers = servers.size(); // servers is not null, numServers > 1
1572    ServerName sn = null;
1573    final int maxIterations = numServers * 4;
1574    int iterations = 0;
1575    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1576    do {
1577      int i = RANDOM.nextInt(numServers);
1578      sn = servers.get(i);
1579      if (!usedSNs.contains(sn)) {
1580        usedSNs.add(sn);
1581      }
1582    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1583        && iterations++ < maxIterations);
1584    if (iterations >= maxIterations) {
1585      // We have reached the max. Means the servers that we collected is still lowering the
1586      // availability
1587      for (ServerName unusedServer : servers) {
1588        if (!usedSNs.contains(unusedServer)) {
1589          // check if any other unused server is there for us to use.
1590          // If so use it. Else we have not other go but to go with one of them
1591          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1592            sn = unusedServer;
1593            break;
1594          }
1595        }
1596      }
1597    }
1598    cluster.doAssignRegion(regionInfo, sn);
1599    return sn;
1600  }
1601
1602  /**
1603   * Round robin a list of regions to a list of servers
1604   */
1605  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1606      List<RegionInfo> unassignedRegions, List<ServerName> servers,
1607      Map<ServerName, List<RegionInfo>> assignments) {
1608
1609    int numServers = servers.size();
1610    int numRegions = regions.size();
1611    int max = (int) Math.ceil((float) numRegions / numServers);
1612    int serverIdx = 0;
1613    if (numServers > 1) {
1614      serverIdx = RANDOM.nextInt(numServers);
1615    }
1616    int regionIdx = 0;
1617
1618    for (int j = 0; j < numServers; j++) {
1619      ServerName server = servers.get((j + serverIdx) % numServers);
1620      List<RegionInfo> serverRegions = new ArrayList<>(max);
1621      for (int i = regionIdx; i < numRegions; i += numServers) {
1622        RegionInfo region = regions.get(i % numRegions);
1623        if (cluster.wouldLowerAvailability(region, server)) {
1624          unassignedRegions.add(region);
1625        } else {
1626          serverRegions.add(region);
1627          cluster.doAssignRegion(region, server);
1628        }
1629      }
1630      assignments.put(server, serverRegions);
1631      regionIdx++;
1632    }
1633  }
1634
1635  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1636    Collection<RegionInfo> regions) {
1637    if (this.services != null && this.services.getAssignmentManager() != null) {
1638      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1639    } else {
1640      return new HashMap<>();
1641    }
1642  }
1643
1644  @Override
1645  public void onConfigurationChange(Configuration conf) {
1646  }
1647}