001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.Deque;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableMap;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.function.Predicate;
038import java.util.stream.Collectors;
039
040import org.apache.commons.lang3.NotImplementedException;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseIOException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RackManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
058import org.apache.hadoop.hbase.net.Address;
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
061import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * The base class for load balancers. It provides the the functions used to by
070 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
071 * in the edge cases. It doesn't provide an implementation of the
072 * actual balancing algorithm.
073 *
074 */
075@InterfaceAudience.Private
076public abstract class BaseLoadBalancer implements LoadBalancer {
077  protected static final int MIN_SERVER_BALANCE = 2;
078  private volatile boolean stopped = false;
079
080  private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
081
082  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
083    = load -> load.getRegionMetrics().isEmpty();
084
085  protected RegionLocationFinder regionFinder;
086  protected boolean useRegionFinder;
087  protected boolean isByTable = false;
088
089  private static class DefaultRackManager extends RackManager {
090    @Override
091    public String getRack(ServerName server) {
092      return UNKNOWN_RACK;
093    }
094  }
095
096  /**
097   * The constructor that uses the basic MetricsBalancer
098   */
099  protected BaseLoadBalancer() {
100    metricsBalancer = new MetricsBalancer();
101    createRegionFinder();
102  }
103
104  /**
105   * This Constructor accepts an instance of MetricsBalancer,
106   * which will be used instead of creating a new one
107   */
108  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
109    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
110    createRegionFinder();
111  }
112
113  private void createRegionFinder() {
114    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
115    if (useRegionFinder) {
116      regionFinder = new RegionLocationFinder();
117    }
118  }
119
120  /**
121   * An efficient array based implementation similar to ClusterState for keeping
122   * the status of the cluster in terms of region assignment and distribution.
123   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
124   * hundreds of thousands of hashmap manipulations are very costly, which is why this
125   * class uses mostly indexes and arrays.
126   *
127   * Cluster tracks a list of unassigned regions, region assignments, and the server
128   * topology in terms of server names, hostnames and racks.
129   */
130  protected static class Cluster {
131    ServerName[] servers;
132    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
133    String[] racks;
134    boolean multiServersPerHost = false; // whether or not any host has more than one server
135
136    ArrayList<String> tables;
137    RegionInfo[] regions;
138    Deque<BalancerRegionLoad>[] regionLoads;
139    private RegionLocationFinder regionFinder;
140
141    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
142
143    int[]   serverIndexToHostIndex;      //serverIndex -> host index
144    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
145
146    int[][] regionsPerServer;            //serverIndex -> region list
147    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
148    int[][] regionsPerHost;              //hostIndex -> list of regions
149    int[][] regionsPerRack;              //rackIndex -> region list
150    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
151    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
152    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
153
154    int[][] serversPerHost;              //hostIndex -> list of server indexes
155    int[][] serversPerRack;              //rackIndex -> list of server indexes
156    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
157    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
158    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
159    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
160    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
161    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
162    boolean hasRegionReplicas = false;   //whether there is regions with replicas
163
164    Integer[] serverIndicesSortedByRegionCount;
165    Integer[] serverIndicesSortedByLocality;
166
167    Map<Address, Integer> serversToIndex;
168    Map<String, Integer> hostsToIndex;
169    Map<String, Integer> racksToIndex;
170    Map<String, Integer> tablesToIndex;
171    Map<RegionInfo, Integer> regionsToIndex;
172    float[] localityPerServer;
173
174    int numServers;
175    int numHosts;
176    int numRacks;
177    int numTables;
178    int numRegions;
179
180    int numMovedRegions = 0; //num moved regions from the initial configuration
181    Map<ServerName, List<RegionInfo>> clusterState;
182
183    protected final RackManager rackManager;
184    // Maps region -> rackIndex -> locality of region on rack
185    private float[][] rackLocalities;
186    // Maps localityType -> region -> [server|rack]Index with highest locality
187    private int[][] regionsToMostLocalEntities;
188
189    protected Cluster(
190        Map<ServerName, List<RegionInfo>> clusterState,
191        Map<String, Deque<BalancerRegionLoad>> loads,
192        RegionLocationFinder regionFinder,
193        RackManager rackManager) {
194      this(null, clusterState, loads, regionFinder, rackManager);
195    }
196
197    @SuppressWarnings("unchecked")
198    protected Cluster(
199        Collection<RegionInfo> unassignedRegions,
200        Map<ServerName, List<RegionInfo>> clusterState,
201        Map<String, Deque<BalancerRegionLoad>> loads,
202        RegionLocationFinder regionFinder,
203        RackManager rackManager) {
204
205      if (unassignedRegions == null) {
206        unassignedRegions = EMPTY_REGION_LIST;
207      }
208
209      serversToIndex = new HashMap<>();
210      hostsToIndex = new HashMap<>();
211      racksToIndex = new HashMap<>();
212      tablesToIndex = new HashMap<>();
213
214      //TODO: We should get the list of tables from master
215      tables = new ArrayList<>();
216      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
217
218      numRegions = 0;
219
220      List<List<Integer>> serversPerHostList = new ArrayList<>();
221      List<List<Integer>> serversPerRackList = new ArrayList<>();
222      this.clusterState = clusterState;
223      this.regionFinder = regionFinder;
224
225      // Use servername and port as there can be dead servers in this list. We want everything with
226      // a matching hostname and port to have the same index.
227      for (ServerName sn : clusterState.keySet()) {
228        if (sn == null) {
229          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
230              "skipping; unassigned regions?");
231          if (LOG.isTraceEnabled()) {
232            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
233          }
234          continue;
235        }
236        if (serversToIndex.get(sn.getAddress()) == null) {
237          serversToIndex.put(sn.getAddress(), numServers++);
238        }
239        if (!hostsToIndex.containsKey(sn.getHostname())) {
240          hostsToIndex.put(sn.getHostname(), numHosts++);
241          serversPerHostList.add(new ArrayList<>(1));
242        }
243
244        int serverIndex = serversToIndex.get(sn.getAddress());
245        int hostIndex = hostsToIndex.get(sn.getHostname());
246        serversPerHostList.get(hostIndex).add(serverIndex);
247
248        String rack = this.rackManager.getRack(sn);
249        if (!racksToIndex.containsKey(rack)) {
250          racksToIndex.put(rack, numRacks++);
251          serversPerRackList.add(new ArrayList<>());
252        }
253        int rackIndex = racksToIndex.get(rack);
254        serversPerRackList.get(rackIndex).add(serverIndex);
255      }
256
257      // Count how many regions there are.
258      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
259        numRegions += entry.getValue().size();
260      }
261      numRegions += unassignedRegions.size();
262
263      regionsToIndex = new HashMap<>(numRegions);
264      servers = new ServerName[numServers];
265      serversPerHost = new int[numHosts][];
266      serversPerRack = new int[numRacks][];
267      regions = new RegionInfo[numRegions];
268      regionIndexToServerIndex = new int[numRegions];
269      initialRegionIndexToServerIndex = new int[numRegions];
270      regionIndexToTableIndex = new int[numRegions];
271      regionIndexToPrimaryIndex = new int[numRegions];
272      regionLoads = new Deque[numRegions];
273
274      regionLocations = new int[numRegions][];
275      serverIndicesSortedByRegionCount = new Integer[numServers];
276      serverIndicesSortedByLocality = new Integer[numServers];
277      localityPerServer = new float[numServers];
278
279      serverIndexToHostIndex = new int[numServers];
280      serverIndexToRackIndex = new int[numServers];
281      regionsPerServer = new int[numServers][];
282      serverIndexToRegionsOffset = new int[numServers];
283      regionsPerHost = new int[numHosts][];
284      regionsPerRack = new int[numRacks][];
285      primariesOfRegionsPerServer = new int[numServers][];
286      primariesOfRegionsPerHost = new int[numHosts][];
287      primariesOfRegionsPerRack = new int[numRacks][];
288
289      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
290
291      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
292        if (entry.getKey() == null) {
293          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
294          continue;
295        }
296        int serverIndex = serversToIndex.get(entry.getKey().getAddress());
297
298        // keep the servername if this is the first server name for this hostname
299        // or this servername has the newest startcode.
300        if (servers[serverIndex] == null ||
301            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
302          servers[serverIndex] = entry.getKey();
303        }
304
305        if (regionsPerServer[serverIndex] != null) {
306          // there is another server with the same hostAndPort in ClusterState.
307          // allocate the array for the total size
308          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
309        } else {
310          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
311        }
312        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
313        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
314        serverIndicesSortedByLocality[serverIndex] = serverIndex;
315      }
316
317      hosts = new String[numHosts];
318      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
319        hosts[entry.getValue()] = entry.getKey();
320      }
321      racks = new String[numRacks];
322      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
323        racks[entry.getValue()] = entry.getKey();
324      }
325
326      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
327        int serverIndex = serversToIndex.get(entry.getKey().getAddress());
328        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
329
330        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
331        serverIndexToHostIndex[serverIndex] = hostIndex;
332
333        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
334        serverIndexToRackIndex[serverIndex] = rackIndex;
335
336        for (RegionInfo region : entry.getValue()) {
337          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
338          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
339          regionIndex++;
340        }
341        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
342      }
343
344      for (RegionInfo region : unassignedRegions) {
345        registerRegion(region, regionIndex, -1, loads, regionFinder);
346        regionIndex++;
347      }
348
349      for (int i = 0; i < serversPerHostList.size(); i++) {
350        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
351        for (int j = 0; j < serversPerHost[i].length; j++) {
352          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
353        }
354        if (serversPerHost[i].length > 1) {
355          multiServersPerHost = true;
356        }
357      }
358
359      for (int i = 0; i < serversPerRackList.size(); i++) {
360        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
361        for (int j = 0; j < serversPerRack[i].length; j++) {
362          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
363        }
364      }
365
366      numTables = tables.size();
367      numRegionsPerServerPerTable = new int[numServers][numTables];
368
369      for (int i = 0; i < numServers; i++) {
370        for (int j = 0; j < numTables; j++) {
371          numRegionsPerServerPerTable[i][j] = 0;
372        }
373      }
374
375      for (int i=0; i < regionIndexToServerIndex.length; i++) {
376        if (regionIndexToServerIndex[i] >= 0) {
377          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
378        }
379      }
380
381      numMaxRegionsPerTable = new int[numTables];
382      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
383        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
384          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
385            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
386          }
387        }
388      }
389
390      for (int i = 0; i < regions.length; i ++) {
391        RegionInfo info = regions[i];
392        if (RegionReplicaUtil.isDefaultReplica(info)) {
393          regionIndexToPrimaryIndex[i] = i;
394        } else {
395          hasRegionReplicas = true;
396          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
397          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
398        }
399      }
400
401      for (int i = 0; i < regionsPerServer.length; i++) {
402        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
403        for (int j = 0; j < regionsPerServer[i].length; j++) {
404          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
405          primariesOfRegionsPerServer[i][j] = primaryIndex;
406        }
407        // sort the regions by primaries.
408        Arrays.sort(primariesOfRegionsPerServer[i]);
409      }
410
411      // compute regionsPerHost
412      if (multiServersPerHost) {
413        for (int i = 0 ; i < serversPerHost.length; i++) {
414          int numRegionsPerHost = 0;
415          for (int j = 0; j < serversPerHost[i].length; j++) {
416            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
417          }
418          regionsPerHost[i] = new int[numRegionsPerHost];
419          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
420        }
421        for (int i = 0 ; i < serversPerHost.length; i++) {
422          int numRegionPerHostIndex = 0;
423          for (int j = 0; j < serversPerHost[i].length; j++) {
424            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
425              int region = regionsPerServer[serversPerHost[i][j]][k];
426              regionsPerHost[i][numRegionPerHostIndex] = region;
427              int primaryIndex = regionIndexToPrimaryIndex[region];
428              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
429              numRegionPerHostIndex++;
430            }
431          }
432          // sort the regions by primaries.
433          Arrays.sort(primariesOfRegionsPerHost[i]);
434        }
435      }
436
437      // compute regionsPerRack
438      if (numRacks > 1) {
439        for (int i = 0 ; i < serversPerRack.length; i++) {
440          int numRegionsPerRack = 0;
441          for (int j = 0; j < serversPerRack[i].length; j++) {
442            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
443          }
444          regionsPerRack[i] = new int[numRegionsPerRack];
445          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
446        }
447
448        for (int i = 0 ; i < serversPerRack.length; i++) {
449          int numRegionPerRackIndex = 0;
450          for (int j = 0; j < serversPerRack[i].length; j++) {
451            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
452              int region = regionsPerServer[serversPerRack[i][j]][k];
453              regionsPerRack[i][numRegionPerRackIndex] = region;
454              int primaryIndex = regionIndexToPrimaryIndex[region];
455              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
456              numRegionPerRackIndex++;
457            }
458          }
459          // sort the regions by primaries.
460          Arrays.sort(primariesOfRegionsPerRack[i]);
461        }
462      }
463    }
464
465    /** Helper for Cluster constructor to handle a region */
466    private void registerRegion(RegionInfo region, int regionIndex,
467        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
468        RegionLocationFinder regionFinder) {
469      String tableName = region.getTable().getNameAsString();
470      if (!tablesToIndex.containsKey(tableName)) {
471        tables.add(tableName);
472        tablesToIndex.put(tableName, tablesToIndex.size());
473      }
474      int tableIndex = tablesToIndex.get(tableName);
475
476      regionsToIndex.put(region, regionIndex);
477      regions[regionIndex] = region;
478      regionIndexToServerIndex[regionIndex] = serverIndex;
479      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
480      regionIndexToTableIndex[regionIndex] = tableIndex;
481
482      // region load
483      if (loads != null) {
484        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
485        // That could have failed if the RegionLoad is using the other regionName
486        if (rl == null) {
487          // Try getting the region load using encoded name.
488          rl = loads.get(region.getEncodedName());
489        }
490        regionLoads[regionIndex] = rl;
491      }
492
493      if (regionFinder != null) {
494        // region location
495        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
496        regionLocations[regionIndex] = new int[loc.size()];
497        for (int i = 0; i < loc.size(); i++) {
498          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
499              : (serversToIndex.get(loc.get(i).getAddress()) == null ? -1
500                  : serversToIndex.get(loc.get(i).getAddress()));
501        }
502      }
503    }
504
505    /**
506     * Returns true iff a given server has less regions than the balanced amount
507     */
508    public boolean serverHasTooFewRegions(int server) {
509      int minLoad = this.numRegions / numServers;
510      int numRegions = getNumRegions(server);
511      return numRegions < minLoad;
512    }
513
514    /**
515     * Retrieves and lazily initializes a field storing the locality of
516     * every region/server combination
517     */
518    public float[][] getOrComputeRackLocalities() {
519      if (rackLocalities == null || regionsToMostLocalEntities == null) {
520        computeCachedLocalities();
521      }
522      return rackLocalities;
523    }
524
525    /**
526     * Lazily initializes and retrieves a mapping of region -> server for which region has
527     * the highest the locality
528     */
529    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
530      if (rackLocalities == null || regionsToMostLocalEntities == null) {
531        computeCachedLocalities();
532      }
533      return regionsToMostLocalEntities[type.ordinal()];
534    }
535
536    /**
537     * Looks up locality from cache of localities. Will create cache if it does
538     * not already exist.
539     */
540    public float getOrComputeLocality(int region, int entity, LocalityType type) {
541      switch (type) {
542        case SERVER:
543          return getLocalityOfRegion(region, entity);
544        case RACK:
545          return getOrComputeRackLocalities()[region][entity];
546        default:
547          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
548      }
549    }
550
551    /**
552     * Returns locality weighted by region size in MB. Will create locality cache
553     * if it does not already exist.
554     */
555    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
556      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
557    }
558
559    /**
560     * Returns the size in MB from the most recent RegionLoad for region
561     */
562    public int getRegionSizeMB(int region) {
563      Deque<BalancerRegionLoad> load = regionLoads[region];
564      // This means regions have no actual data on disk
565      if (load == null) {
566        return 0;
567      }
568      return regionLoads[region].getLast().getStorefileSizeMB();
569    }
570
571    /**
572     * Computes and caches the locality for each region/rack combinations,
573     * as well as storing a mapping of region -> server and region -> rack such that server
574     * and rack have the highest locality for region
575     */
576    private void computeCachedLocalities() {
577      rackLocalities = new float[numRegions][numRacks];
578      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
579
580      // Compute localities and find most local server per region
581      for (int region = 0; region < numRegions; region++) {
582        int serverWithBestLocality = 0;
583        float bestLocalityForRegion = 0;
584        for (int server = 0; server < numServers; server++) {
585          // Aggregate per-rack locality
586          float locality = getLocalityOfRegion(region, server);
587          int rack = serverIndexToRackIndex[server];
588          int numServersInRack = serversPerRack[rack].length;
589          rackLocalities[region][rack] += locality / numServersInRack;
590
591          if (locality > bestLocalityForRegion) {
592            serverWithBestLocality = server;
593            bestLocalityForRegion = locality;
594          }
595        }
596        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
597
598        // Find most local rack per region
599        int rackWithBestLocality = 0;
600        float bestRackLocalityForRegion = 0.0f;
601        for (int rack = 0; rack < numRacks; rack++) {
602          float rackLocality = rackLocalities[region][rack];
603          if (rackLocality > bestRackLocalityForRegion) {
604            bestRackLocalityForRegion = rackLocality;
605            rackWithBestLocality = rack;
606          }
607        }
608        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
609      }
610
611    }
612
613    /**
614     * Maps region index to rack index
615     */
616    public int getRackForRegion(int region) {
617      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
618    }
619
620    enum LocalityType {
621      SERVER,
622      RACK
623    }
624
625    /** An action to move or swap a region */
626    public static class Action {
627      public enum Type {
628        ASSIGN_REGION,
629        MOVE_REGION,
630        SWAP_REGIONS,
631        NULL,
632      }
633
634      public Type type;
635      public Action (Type type) {this.type = type;}
636      /** Returns an Action which would undo this action */
637      public Action undoAction() { return this; }
638      @Override
639      public String toString() { return type + ":";}
640    }
641
642    public static class AssignRegionAction extends Action {
643      public int region;
644      public int server;
645      public AssignRegionAction(int region, int server) {
646        super(Type.ASSIGN_REGION);
647        this.region = region;
648        this.server = server;
649      }
650      @Override
651      public Action undoAction() {
652        // TODO implement this. This action is not being used by the StochasticLB for now
653        // in case it uses it, we should implement this function.
654        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
655      }
656      @Override
657      public String toString() {
658        return type + ": " + region + ":" + server;
659      }
660    }
661
662    public static class MoveRegionAction extends Action {
663      public int region;
664      public int fromServer;
665      public int toServer;
666
667      public MoveRegionAction(int region, int fromServer, int toServer) {
668        super(Type.MOVE_REGION);
669        this.fromServer = fromServer;
670        this.region = region;
671        this.toServer = toServer;
672      }
673      @Override
674      public Action undoAction() {
675        return new MoveRegionAction (region, toServer, fromServer);
676      }
677      @Override
678      public String toString() {
679        return type + ": " + region + ":" + fromServer + " -> " + toServer;
680      }
681    }
682
683    public static class SwapRegionsAction extends Action {
684      public int fromServer;
685      public int fromRegion;
686      public int toServer;
687      public int toRegion;
688      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
689        super(Type.SWAP_REGIONS);
690        this.fromServer = fromServer;
691        this.fromRegion = fromRegion;
692        this.toServer = toServer;
693        this.toRegion = toRegion;
694      }
695      @Override
696      public Action undoAction() {
697        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
698      }
699      @Override
700      public String toString() {
701        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
702      }
703    }
704
705    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
706        justification="Mistake. Too disruptive to change now")
707    public static final Action NullAction = new Action(Type.NULL);
708
709    public void doAction(Action action) {
710      switch (action.type) {
711      case NULL: break;
712      case ASSIGN_REGION:
713        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
714        assert action instanceof AssignRegionAction: action.getClass();
715        AssignRegionAction ar = (AssignRegionAction) action;
716        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
717        regionMoved(ar.region, -1, ar.server);
718        break;
719      case MOVE_REGION:
720        assert action instanceof MoveRegionAction: action.getClass();
721        MoveRegionAction mra = (MoveRegionAction) action;
722        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
723        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
724        regionMoved(mra.region, mra.fromServer, mra.toServer);
725        break;
726      case SWAP_REGIONS:
727        assert action instanceof SwapRegionsAction: action.getClass();
728        SwapRegionsAction a = (SwapRegionsAction) action;
729        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
730        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
731        regionMoved(a.fromRegion, a.fromServer, a.toServer);
732        regionMoved(a.toRegion, a.toServer, a.fromServer);
733        break;
734      default:
735        throw new RuntimeException("Uknown action:" + action.type);
736      }
737    }
738
739    /**
740     * Return true if the placement of region on server would lower the availability
741     * of the region in question
742     * @return true or false
743     */
744    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
745      if (!serversToIndex.containsKey(serverName.getAddress())) {
746        return false; // safeguard against race between cluster.servers and servers from LB method args
747      }
748      int server = serversToIndex.get(serverName.getAddress());
749      int region = regionsToIndex.get(regionInfo);
750
751      // Region replicas for same region should better assign to different servers
752      for (int i : regionsPerServer[server]) {
753        RegionInfo otherRegionInfo = regions[i];
754        if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
755          return true;
756        }
757      }
758
759      int primary = regionIndexToPrimaryIndex[region];
760      if (primary == -1) {
761        return false;
762      }
763      // there is a subset relation for server < host < rack
764      // check server first
765      if (contains(primariesOfRegionsPerServer[server], primary)) {
766        // check for whether there are other servers that we can place this region
767        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
768          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
769            return true; // meaning there is a better server
770          }
771        }
772        return false; // there is not a better server to place this
773      }
774
775      // check host
776      if (multiServersPerHost) {
777        // these arrays would only be allocated if we have more than one server per host
778        int host = serverIndexToHostIndex[server];
779        if (contains(primariesOfRegionsPerHost[host], primary)) {
780          // check for whether there are other hosts that we can place this region
781          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
782            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
783              return true; // meaning there is a better host
784            }
785          }
786          return false; // there is not a better host to place this
787        }
788      }
789
790      // check rack
791      if (numRacks > 1) {
792        int rack = serverIndexToRackIndex[server];
793        if (contains(primariesOfRegionsPerRack[rack], primary)) {
794          // check for whether there are other racks that we can place this region
795          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
796            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
797              return true; // meaning there is a better rack
798            }
799          }
800          return false; // there is not a better rack to place this
801        }
802      }
803
804      return false;
805    }
806
807    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
808      if (!serversToIndex.containsKey(serverName.getAddress())) {
809        return;
810      }
811      int server = serversToIndex.get(serverName.getAddress());
812      int region = regionsToIndex.get(regionInfo);
813      doAction(new AssignRegionAction(region, server));
814    }
815
816    void regionMoved(int region, int oldServer, int newServer) {
817      regionIndexToServerIndex[region] = newServer;
818      if (initialRegionIndexToServerIndex[region] == newServer) {
819        numMovedRegions--; //region moved back to original location
820      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
821        numMovedRegions++; //region moved from original location
822      }
823      int tableIndex = regionIndexToTableIndex[region];
824      if (oldServer >= 0) {
825        numRegionsPerServerPerTable[oldServer][tableIndex]--;
826      }
827      numRegionsPerServerPerTable[newServer][tableIndex]++;
828
829      //check whether this caused maxRegionsPerTable in the new Server to be updated
830      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
831        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
832      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
833          == numMaxRegionsPerTable[tableIndex]) {
834        //recompute maxRegionsPerTable since the previous value was coming from the old server
835        numMaxRegionsPerTable[tableIndex] = 0;
836        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
837          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
838            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
839          }
840        }
841      }
842
843      // update for servers
844      int primary = regionIndexToPrimaryIndex[region];
845      if (oldServer >= 0) {
846        primariesOfRegionsPerServer[oldServer] = removeRegion(
847          primariesOfRegionsPerServer[oldServer], primary);
848      }
849      primariesOfRegionsPerServer[newServer] = addRegionSorted(
850        primariesOfRegionsPerServer[newServer], primary);
851
852      // update for hosts
853      if (multiServersPerHost) {
854        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
855        int newHost = serverIndexToHostIndex[newServer];
856        if (newHost != oldHost) {
857          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
858          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
859          if (oldHost >= 0) {
860            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
861            primariesOfRegionsPerHost[oldHost] = removeRegion(
862              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
863          }
864        }
865      }
866
867      // update for racks
868      if (numRacks > 1) {
869        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
870        int newRack = serverIndexToRackIndex[newServer];
871        if (newRack != oldRack) {
872          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
873          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
874          if (oldRack >= 0) {
875            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
876            primariesOfRegionsPerRack[oldRack] = removeRegion(
877              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
878          }
879        }
880      }
881    }
882
883    int[] removeRegion(int[] regions, int regionIndex) {
884      //TODO: this maybe costly. Consider using linked lists
885      int[] newRegions = new int[regions.length - 1];
886      int i = 0;
887      for (i = 0; i < regions.length; i++) {
888        if (regions[i] == regionIndex) {
889          break;
890        }
891        newRegions[i] = regions[i];
892      }
893      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
894      return newRegions;
895    }
896
897    int[] addRegion(int[] regions, int regionIndex) {
898      int[] newRegions = new int[regions.length + 1];
899      System.arraycopy(regions, 0, newRegions, 0, regions.length);
900      newRegions[newRegions.length - 1] = regionIndex;
901      return newRegions;
902    }
903
904    int[] addRegionSorted(int[] regions, int regionIndex) {
905      int[] newRegions = new int[regions.length + 1];
906      int i = 0;
907      for (i = 0; i < regions.length; i++) { // find the index to insert
908        if (regions[i] > regionIndex) {
909          break;
910        }
911      }
912      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
913      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
914      newRegions[i] = regionIndex;
915
916      return newRegions;
917    }
918
919    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
920      int i = 0;
921      for (i = 0; i < regions.length; i++) {
922        if (regions[i] == regionIndex) {
923          regions[i] = newRegionIndex;
924          break;
925        }
926      }
927      return regions;
928    }
929
930    void sortServersByRegionCount() {
931      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
932    }
933
934    int getNumRegions(int server) {
935      return regionsPerServer[server].length;
936    }
937
938    boolean contains(int[] arr, int val) {
939      return Arrays.binarySearch(arr, val) >= 0;
940    }
941
942    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
943
944    int getLowestLocalityRegionOnServer(int serverIndex) {
945      if (regionFinder != null) {
946        float lowestLocality = 1.0f;
947        int lowestLocalityRegionIndex = -1;
948        if (regionsPerServer[serverIndex].length == 0) {
949          // No regions on that region server
950          return -1;
951        }
952        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
953          int regionIndex = regionsPerServer[serverIndex][j];
954          HDFSBlocksDistribution distribution = regionFinder
955              .getBlockDistribution(regions[regionIndex]);
956          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
957          // skip empty region
958          if (distribution.getUniqueBlocksTotalWeight() == 0) {
959            continue;
960          }
961          if (locality < lowestLocality) {
962            lowestLocality = locality;
963            lowestLocalityRegionIndex = j;
964          }
965        }
966        if (lowestLocalityRegionIndex == -1) {
967          return -1;
968        }
969        if (LOG.isTraceEnabled()) {
970          LOG.trace("Lowest locality region is "
971              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
972                  .getRegionNameAsString() + " with locality " + lowestLocality
973              + " and its region server contains " + regionsPerServer[serverIndex].length
974              + " regions");
975        }
976        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
977      } else {
978        return -1;
979      }
980    }
981
982    float getLocalityOfRegion(int region, int server) {
983      if (regionFinder != null) {
984        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
985        return distribution.getBlockLocalityIndex(servers[server].getHostname());
986      } else {
987        return 0f;
988      }
989    }
990
991    @VisibleForTesting
992    protected void setNumRegions(int numRegions) {
993      this.numRegions = numRegions;
994    }
995
996    @VisibleForTesting
997    protected void setNumMovedRegions(int numMovedRegions) {
998      this.numMovedRegions = numMovedRegions;
999    }
1000
1001    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
1002        justification="Not important but should be fixed")
1003    @Override
1004    public String toString() {
1005      StringBuilder desc = new StringBuilder("Cluster={servers=[");
1006      for(ServerName sn:servers) {
1007        desc.append(sn.getAddress().toString()).append(", ");
1008      }
1009      desc.append("], serverIndicesSortedByRegionCount=")
1010          .append(Arrays.toString(serverIndicesSortedByRegionCount))
1011          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
1012
1013      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
1014          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1015          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1016          .append(numMovedRegions).append('}');
1017      return desc.toString();
1018    }
1019  }
1020
1021  // slop for regions
1022  protected float slop;
1023  // overallSlop to control simpleLoadBalancer's cluster level threshold
1024  protected float overallSlop;
1025  protected Configuration config = HBaseConfiguration.create();
1026  protected RackManager rackManager;
1027  private static final Random RANDOM = new Random(System.currentTimeMillis());
1028  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1029  protected MetricsBalancer metricsBalancer = null;
1030  protected ClusterMetrics clusterStatus = null;
1031  protected ServerName masterServerName;
1032  protected MasterServices services;
1033  protected boolean onlySystemTablesOnMaster;
1034  protected boolean maintenanceMode;
1035
1036  @Override
1037  public void setConf(Configuration conf) {
1038    this.config = conf;
1039    setSlop(conf);
1040    if (slop < 0) slop = 0;
1041    else if (slop > 1) slop = 1;
1042
1043    if (overallSlop < 0) overallSlop = 0;
1044    else if (overallSlop > 1) overallSlop = 1;
1045
1046    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1047
1048    this.rackManager = new RackManager(getConf());
1049    if (useRegionFinder) {
1050      regionFinder.setConf(conf);
1051    }
1052    this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
1053    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1054    LOG.info("slop={}, systemTablesOnMaster={}",
1055        this.slop, this.onlySystemTablesOnMaster);
1056  }
1057
1058  protected void setSlop(Configuration conf) {
1059    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1060    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1061  }
1062
1063  /**
1064   * Check if a region belongs to some system table.
1065   * If so, the primary replica may be expected to be put on the master regionserver.
1066   */
1067  public boolean shouldBeOnMaster(RegionInfo region) {
1068    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1069        && region.getTable().isSystemTable();
1070  }
1071
1072  /**
1073   * Balance the regions that should be on master regionserver.
1074   */
1075  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1076    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1077    List<RegionPlan> plans = null;
1078    List<RegionInfo> regions = clusterMap.get(masterServerName);
1079    if (regions != null) {
1080      Iterator<ServerName> keyIt = null;
1081      for (RegionInfo region: regions) {
1082        if (shouldBeOnMaster(region)) continue;
1083
1084        // Find a non-master regionserver to host the region
1085        if (keyIt == null || !keyIt.hasNext()) {
1086          keyIt = clusterMap.keySet().iterator();
1087        }
1088        ServerName dest = keyIt.next();
1089        if (masterServerName.equals(dest)) {
1090          if (!keyIt.hasNext()) {
1091            keyIt = clusterMap.keySet().iterator();
1092          }
1093          dest = keyIt.next();
1094        }
1095
1096        // Move this region away from the master regionserver
1097        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1098        if (plans == null) {
1099          plans = new ArrayList<>();
1100        }
1101        plans.add(plan);
1102      }
1103    }
1104    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1105      if (masterServerName.equals(server.getKey())) continue;
1106      for (RegionInfo region: server.getValue()) {
1107        if (!shouldBeOnMaster(region)) continue;
1108
1109        // Move this region to the master regionserver
1110        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1111        if (plans == null) {
1112          plans = new ArrayList<>();
1113        }
1114        plans.add(plan);
1115      }
1116    }
1117    return plans;
1118  }
1119
1120  /**
1121   * If master is configured to carry system tables only, in here is
1122   * where we figure what to assign it.
1123   */
1124  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1125      Collection<RegionInfo> regions, List<ServerName> servers) {
1126    if (servers == null || regions == null || regions.isEmpty()) {
1127      return null;
1128    }
1129    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1130    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1131      if (masterServerName != null && servers.contains(masterServerName)) {
1132        assignments.put(masterServerName, new ArrayList<>());
1133        for (RegionInfo region : regions) {
1134          if (shouldBeOnMaster(region)) {
1135            assignments.get(masterServerName).add(region);
1136          }
1137        }
1138      }
1139    }
1140    return assignments;
1141  }
1142
1143  @Override
1144  public Configuration getConf() {
1145    return this.config;
1146  }
1147
1148  @Override
1149  public synchronized void setClusterMetrics(ClusterMetrics st) {
1150    this.clusterStatus = st;
1151    if (useRegionFinder) {
1152      regionFinder.setClusterMetrics(st);
1153    }
1154  }
1155
1156
1157  @Override
1158  public void setMasterServices(MasterServices masterServices) {
1159    masterServerName = masterServices.getServerName();
1160    this.services = masterServices;
1161    if (useRegionFinder) {
1162      this.regionFinder.setServices(masterServices);
1163    }
1164    if (this.services.isInMaintenanceMode()) {
1165      this.maintenanceMode = true;
1166    }
1167  }
1168
1169  @Override
1170  public void postMasterStartupInitialize() {
1171    if (services != null && regionFinder != null) {
1172      try {
1173        Set<RegionInfo> regions =
1174            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1175        regionFinder.refreshAndWait(regions);
1176      } catch (Exception e) {
1177        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1178      }
1179    }
1180  }
1181
1182  public void setRackManager(RackManager rackManager) {
1183    this.rackManager = rackManager;
1184  }
1185
1186  protected boolean needsBalance(TableName tableName, Cluster c) {
1187    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1188    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1189      if (LOG.isDebugEnabled()) {
1190        LOG.debug("Not running balancer because only " + cs.getNumServers()
1191            + " active regionserver(s)");
1192      }
1193      return false;
1194    }
1195    if(areSomeRegionReplicasColocated(c)) return true;
1196    if(idleRegionServerExist(c)) {
1197      return true;
1198    }
1199
1200    // Check if we even need to do any load balancing
1201    // HBASE-3681 check sloppiness first
1202    float average = cs.getLoadAverage(); // for logging
1203    int floor = (int) Math.floor(average * (1 - slop));
1204    int ceiling = (int) Math.ceil(average * (1 + slop));
1205    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1206      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1207      if (LOG.isTraceEnabled()) {
1208        // If nothing to balance, then don't say anything unless trace-level logging.
1209        LOG.trace("Skipping load balancing because balanced cluster; " +
1210          "servers=" + cs.getNumServers() +
1211          " regions=" + cs.getNumRegions() + " average=" + average +
1212          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1213          " leastloaded=" + serversByLoad.firstKey().getLoad());
1214      }
1215      return false;
1216    }
1217    return true;
1218  }
1219
1220  /**
1221   * Subclasses should implement this to return true if the cluster has nodes that hosts
1222   * multiple replicas for the same region, or, if there are multiple racks and the same
1223   * rack hosts replicas of the same region
1224   * @param c Cluster information
1225   * @return whether region replicas are currently co-located
1226   */
1227  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1228    return false;
1229  }
1230
1231  protected final boolean idleRegionServerExist(Cluster c){
1232    boolean isServerExistsWithMoreRegions = false;
1233    boolean isServerExistsWithZeroRegions = false;
1234    for (int[] serverList: c.regionsPerServer){
1235      if (serverList.length > 1) {
1236        isServerExistsWithMoreRegions = true;
1237      }
1238      if (serverList.length == 0) {
1239        isServerExistsWithZeroRegions = true;
1240      }
1241    }
1242    return isServerExistsWithMoreRegions && isServerExistsWithZeroRegions;
1243  }
1244
1245  /**
1246   * Generates a bulk assignment plan to be used on cluster startup using a
1247   * simple round-robin assignment.
1248   * <p>
1249   * Takes a list of all the regions and all the servers in the cluster and
1250   * returns a map of each server to the regions that it should be assigned.
1251   * <p>
1252   * Currently implemented as a round-robin assignment. Same invariant as load
1253   * balancing, all servers holding floor(avg) or ceiling(avg).
1254   *
1255   * TODO: Use block locations from HDFS to place regions with their blocks
1256   *
1257   * @param regions all regions
1258   * @param servers all servers
1259   * @return map of server to the regions it should take, or null if no
1260   *         assignment is possible (ie. no regions or no servers)
1261   */
1262  @Override
1263  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1264      List<ServerName> servers) throws HBaseIOException {
1265    metricsBalancer.incrMiscInvocations();
1266    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1267    if (assignments != null && !assignments.isEmpty()) {
1268      servers = new ArrayList<>(servers);
1269      // Guarantee not to put other regions on master
1270      servers.remove(masterServerName);
1271      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1272      if (!masterRegions.isEmpty()) {
1273        regions = new ArrayList<>(regions);
1274        regions.removeAll(masterRegions);
1275      }
1276    }
1277    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1278      return assignments;
1279    }
1280
1281    int numServers = servers == null ? 0 : servers.size();
1282    if (numServers == 0) {
1283      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1284      return null;
1285    }
1286
1287    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1288    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1289    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1290    // and balanced. This should also run fast with fewer number of iterations.
1291
1292    if (numServers == 1) { // Only one server, nothing fancy we can do here
1293      ServerName server = servers.get(0);
1294      assignments.put(server, new ArrayList<>(regions));
1295      return assignments;
1296    }
1297
1298    Cluster cluster = createCluster(servers, regions);
1299    roundRobinAssignment(cluster, regions, servers, assignments);
1300    return assignments;
1301  }
1302
1303  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
1304      throws HBaseIOException {
1305    boolean hasRegionReplica = false;
1306    try {
1307      if (services != null && services.getTableDescriptors() != null) {
1308        Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
1309        for (RegionInfo regionInfo : regions) {
1310          TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
1311          if (td != null && td.getRegionReplication() > 1) {
1312            hasRegionReplica = true;
1313            break;
1314          }
1315        }
1316      }
1317    } catch (IOException ioe) {
1318      throw new HBaseIOException(ioe);
1319    }
1320
1321    // Get the snapshot of the current assignments for the regions in question, and then create
1322    // a cluster out of it. Note that we might have replicas already assigned to some servers
1323    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1324    // replicas of the regions that are passed (for performance).
1325    Map<ServerName, List<RegionInfo>> clusterState = null;
1326    if (!hasRegionReplica) {
1327      clusterState = getRegionAssignmentsByServer(regions);
1328    } else {
1329      // for the case where we have region replica it is better we get the entire cluster's snapshot
1330      clusterState = getRegionAssignmentsByServer(null);
1331    }
1332
1333    for (ServerName server : servers) {
1334      if (!clusterState.containsKey(server)) {
1335        clusterState.put(server, EMPTY_REGION_LIST);
1336      }
1337    }
1338    return new Cluster(regions, clusterState, null, this.regionFinder,
1339        rackManager);
1340  }
1341
1342  private List<ServerName> findIdleServers(List<ServerName> servers) {
1343    return this.services.getServerManager()
1344            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1345  }
1346
1347  /**
1348   * Used to assign a single region to a random server.
1349   */
1350  @Override
1351  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1352      throws HBaseIOException {
1353    metricsBalancer.incrMiscInvocations();
1354    if (servers != null && servers.contains(masterServerName)) {
1355      if (shouldBeOnMaster(regionInfo)) {
1356        return masterServerName;
1357      }
1358      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1359        // Guarantee we do not put any regions on master
1360        servers = new ArrayList<>(servers);
1361        servers.remove(masterServerName);
1362      }
1363    }
1364
1365    int numServers = servers == null ? 0 : servers.size();
1366    if (numServers == 0) {
1367      LOG.warn("Wanted to retain assignment but no servers to assign to");
1368      return null;
1369    }
1370    if (numServers == 1) { // Only one server, nothing fancy we can do here
1371      return servers.get(0);
1372    }
1373    List<ServerName> idleServers = findIdleServers(servers);
1374    if (idleServers.size() == 1) {
1375      return idleServers.get(0);
1376    }
1377    final List<ServerName> finalServers = idleServers.isEmpty() ?
1378            servers : idleServers;
1379    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1380    Cluster cluster = createCluster(finalServers, regions);
1381    return randomAssignment(cluster, regionInfo, finalServers);
1382  }
1383
1384  /**
1385   * Generates a bulk assignment startup plan, attempting to reuse the existing
1386   * assignment information from META, but adjusting for the specified list of
1387   * available/online servers available for assignment.
1388   * <p>
1389   * Takes a map of all regions to their existing assignment from META. Also
1390   * takes a list of online servers for regions to be assigned to. Attempts to
1391   * retain all assignment, so in some instances initial assignment will not be
1392   * completely balanced.
1393   * <p>
1394   * Any leftover regions without an existing server to be assigned to will be
1395   * assigned randomly to available servers.
1396   *
1397   * @param regions regions and existing assignment from meta
1398   * @param servers available servers
1399   * @return map of servers and regions to be assigned to them
1400   */
1401  @Override
1402  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1403      List<ServerName> servers) throws HBaseIOException {
1404    // Update metrics
1405    metricsBalancer.incrMiscInvocations();
1406    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1407    if (assignments != null && !assignments.isEmpty()) {
1408      servers = new ArrayList<>(servers);
1409      // Guarantee not to put other regions on master
1410      servers.remove(masterServerName);
1411      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1412      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1413          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1414    }
1415    if (this.maintenanceMode || regions.isEmpty()) {
1416      return assignments;
1417    }
1418
1419    int numServers = servers == null ? 0 : servers.size();
1420    if (numServers == 0) {
1421      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1422      return null;
1423    }
1424    if (numServers == 1) { // Only one server, nothing fancy we can do here
1425      ServerName server = servers.get(0);
1426      assignments.put(server, new ArrayList<>(regions.keySet()));
1427      return assignments;
1428    }
1429
1430    // Group all of the old assignments by their hostname.
1431    // We can't group directly by ServerName since the servers all have
1432    // new start-codes.
1433
1434    // Group the servers by their hostname. It's possible we have multiple
1435    // servers on the same host on different ports.
1436    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1437    for (ServerName server : servers) {
1438      assignments.put(server, new ArrayList<>());
1439      serversByHostname.put(server.getHostnameLowerCase(), server);
1440    }
1441
1442    // Collection of the hostnames that used to have regions
1443    // assigned, but for which we no longer have any RS running
1444    // after the cluster restart.
1445    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1446
1447    // If the old servers aren't present, lets assign those regions later.
1448    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1449
1450    int numRandomAssignments = 0;
1451    int numRetainedAssigments = 0;
1452    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1453      RegionInfo region = entry.getKey();
1454      ServerName oldServerName = entry.getValue();
1455      List<ServerName> localServers = new ArrayList<>();
1456      if (oldServerName != null) {
1457        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1458      }
1459      if (localServers.isEmpty()) {
1460        // No servers on the new cluster match up with this hostname, assign randomly, later.
1461        randomAssignRegions.add(region);
1462        if (oldServerName != null) {
1463          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1464        }
1465      } else if (localServers.size() == 1) {
1466        // the usual case - one new server on same host
1467        ServerName target = localServers.get(0);
1468        assignments.get(target).add(region);
1469        numRetainedAssigments++;
1470      } else {
1471        // multiple new servers in the cluster on this same host
1472        if (localServers.contains(oldServerName)) {
1473          assignments.get(oldServerName).add(region);
1474          numRetainedAssigments++;
1475        } else {
1476          ServerName target = null;
1477          for (ServerName tmp : localServers) {
1478            if (tmp.getPort() == oldServerName.getPort()) {
1479              target = tmp;
1480              assignments.get(tmp).add(region);
1481              numRetainedAssigments++;
1482              break;
1483            }
1484          }
1485          if (target == null) {
1486            randomAssignRegions.add(region);
1487          }
1488        }
1489      }
1490    }
1491
1492    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1493    if (randomAssignRegions.size() > 0) {
1494      Cluster cluster = createCluster(servers, regions.keySet());
1495      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1496        ServerName sn = entry.getKey();
1497        for (RegionInfo region : entry.getValue()) {
1498          cluster.doAssignRegion(region, sn);
1499        }
1500      }
1501      for (RegionInfo region : randomAssignRegions) {
1502        ServerName target = randomAssignment(cluster, region, servers);
1503        assignments.get(target).add(region);
1504        numRandomAssignments++;
1505      }
1506    }
1507
1508    String randomAssignMsg = "";
1509    if (numRandomAssignments > 0) {
1510      randomAssignMsg =
1511          numRandomAssignments + " regions were assigned "
1512              + "to random hosts, since the old hosts for these regions are no "
1513              + "longer present in the cluster. These hosts were:\n  "
1514              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1515    }
1516
1517    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1518        + " retained the pre-restart assignment. " + randomAssignMsg);
1519    return assignments;
1520  }
1521
1522  @Override
1523  public void initialize() throws HBaseIOException{
1524  }
1525
1526  @Override
1527  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1528  }
1529
1530  @Override
1531  public void regionOffline(RegionInfo regionInfo) {
1532  }
1533
1534  @Override
1535  public boolean isStopped() {
1536    return stopped;
1537  }
1538
1539  @Override
1540  public void stop(String why) {
1541    LOG.info("Load Balancer stop requested: "+why);
1542    stopped = true;
1543  }
1544
1545  /**
1546  * Updates the balancer status tag reported to JMX
1547  */
1548  public void updateBalancerStatus(boolean status) {
1549    metricsBalancer.balancerStatus(status);
1550  }
1551
1552  /**
1553   * Used to assign a single region to a random server.
1554   */
1555  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1556      List<ServerName> servers) {
1557    int numServers = servers.size(); // servers is not null, numServers > 1
1558    ServerName sn = null;
1559    final int maxIterations = numServers * 4;
1560    int iterations = 0;
1561    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1562    do {
1563      int i = RANDOM.nextInt(numServers);
1564      sn = servers.get(i);
1565      if (!usedSNs.contains(sn)) {
1566        usedSNs.add(sn);
1567      }
1568    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1569        && iterations++ < maxIterations);
1570    if (iterations >= maxIterations) {
1571      // We have reached the max. Means the servers that we collected is still lowering the
1572      // availability
1573      for (ServerName unusedServer : servers) {
1574        if (!usedSNs.contains(unusedServer)) {
1575          // check if any other unused server is there for us to use.
1576          // If so use it. Else we have not other go but to go with one of them
1577          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1578            sn = unusedServer;
1579            break;
1580          }
1581        }
1582      }
1583    }
1584    cluster.doAssignRegion(regionInfo, sn);
1585    return sn;
1586  }
1587
1588  /**
1589   * Round robin a list of regions to a list of servers
1590   */
1591  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1592    List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) {
1593    List<RegionInfo> unassignedRegions = new ArrayList<>();
1594    int numServers = servers.size();
1595    int numRegions = regions.size();
1596    int max = (int) Math.ceil((float) numRegions / numServers);
1597    int serverIdx = 0;
1598    if (numServers > 1) {
1599      serverIdx = RANDOM.nextInt(numServers);
1600    }
1601    int regionIdx = 0;
1602    for (int j = 0; j < numServers; j++) {
1603      ServerName server = servers.get((j + serverIdx) % numServers);
1604      List<RegionInfo> serverRegions = new ArrayList<>(max);
1605      for (int i = regionIdx; i < numRegions; i += numServers) {
1606        RegionInfo region = regions.get(i % numRegions);
1607        if (cluster.wouldLowerAvailability(region, server)) {
1608          unassignedRegions.add(region);
1609        } else {
1610          serverRegions.add(region);
1611          cluster.doAssignRegion(region, server);
1612        }
1613      }
1614      assignments.put(server, serverRegions);
1615      regionIdx++;
1616    }
1617
1618
1619    List<RegionInfo> lastFewRegions = new ArrayList<>();
1620    // assign the remaining by going through the list and try to assign to servers one-by-one
1621    serverIdx = RANDOM.nextInt(numServers);
1622    OUTER : for (RegionInfo region : unassignedRegions) {
1623      boolean assigned = false;
1624      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1625        ServerName server = servers.get((j + serverIdx) % numServers);
1626        if (cluster.wouldLowerAvailability(region, server)) {
1627          continue INNER;
1628        } else {
1629          assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1630          cluster.doAssignRegion(region, server);
1631          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1632          assigned = true;
1633          break;
1634        }
1635      }
1636      if (!assigned) {
1637        lastFewRegions.add(region);
1638      }
1639    }
1640    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1641    // make it optimal later. we can end up with this if numReplicas > numServers.
1642    for (RegionInfo region : lastFewRegions) {
1643      int i = RANDOM.nextInt(numServers);
1644      ServerName server = servers.get(i);
1645      assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
1646      cluster.doAssignRegion(region, server);
1647    }
1648  }
1649
1650  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1651    Collection<RegionInfo> regions) {
1652    if (this.services != null && this.services.getAssignmentManager() != null) {
1653      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1654    } else {
1655      return new HashMap<>();
1656    }
1657  }
1658
1659  private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
1660      Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
1661    Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
1662    for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
1663      serverNameListMap.forEach((serverName, regionInfoList) -> {
1664        List<RegionInfo> regionInfos =
1665            returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
1666        regionInfos.addAll(regionInfoList);
1667      });
1668    }
1669    return returnMap;
1670  }
1671
1672  @Override
1673  public abstract List<RegionPlan> balanceTable(TableName tableName,
1674      Map<ServerName, List<RegionInfo>> loadOfOneTable);
1675
1676  @Override
1677  public List<RegionPlan>
1678      balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
1679    if (isByTable) {
1680      List<RegionPlan> result = new ArrayList<>();
1681      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
1682        LOG.info("Start Generate Balance plan for table: " + tableName);
1683        List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
1684        if (partialPlans != null) {
1685          result.addAll(partialPlans);
1686        }
1687      });
1688      return result;
1689    } else {
1690      LOG.info("Start Generate Balance plan for cluster.");
1691      return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
1692    }
1693  }
1694
1695  @Override
1696  public void onConfigurationChange(Configuration conf) {
1697  }
1698}