001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.NavigableMap;
032import java.util.Random;
033import java.util.Set;
034import java.util.TreeMap;
035import java.util.function.Predicate;
036import java.util.stream.Collectors;
037import org.apache.commons.lang3.NotImplementedException;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ClusterMetrics;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HDFSBlocksDistribution;
044import org.apache.hadoop.hbase.ServerMetrics;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionReplicaUtil;
049import org.apache.hadoop.hbase.master.LoadBalancer;
050import org.apache.hadoop.hbase.master.MasterServices;
051import org.apache.hadoop.hbase.master.RackManager;
052import org.apache.hadoop.hbase.master.RegionPlan;
053import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063
064/**
065 * The base class for load balancers. It provides the the functions used to by
066 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
067 * in the edge cases. It doesn't provide an implementation of the
068 * actual balancing algorithm.
069 *
070 */
071@InterfaceAudience.Private
072public abstract class BaseLoadBalancer implements LoadBalancer {
073  protected static final int MIN_SERVER_BALANCE = 2;
074  private volatile boolean stopped = false;
075
076  private static final List<RegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0);
077
078  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
079    = load -> load.getRegionMetrics().isEmpty();
080
081  protected RegionLocationFinder regionFinder;
082  protected boolean useRegionFinder;
083
084  private static class DefaultRackManager extends RackManager {
085    @Override
086    public String getRack(ServerName server) {
087      return UNKNOWN_RACK;
088    }
089  }
090
091  /**
092   * The constructor that uses the basic MetricsBalancer
093   */
094  protected BaseLoadBalancer() {
095    metricsBalancer = new MetricsBalancer();
096    createRegionFinder();
097  }
098
099  /**
100   * This Constructor accepts an instance of MetricsBalancer,
101   * which will be used instead of creating a new one
102   */
103  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
104    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
105    createRegionFinder();
106  }
107
108  private void createRegionFinder() {
109    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
110    if (useRegionFinder) {
111      regionFinder = new RegionLocationFinder();
112    }
113  }
114
115  /**
116   * An efficient array based implementation similar to ClusterState for keeping
117   * the status of the cluster in terms of region assignment and distribution.
118   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
119   * hundreds of thousands of hashmap manipulations are very costly, which is why this
120   * class uses mostly indexes and arrays.
121   *
122   * Cluster tracks a list of unassigned regions, region assignments, and the server
123   * topology in terms of server names, hostnames and racks.
124   */
125  protected static class Cluster {
126    ServerName[] servers;
127    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
128    String[] racks;
129    boolean multiServersPerHost = false; // whether or not any host has more than one server
130
131    ArrayList<String> tables;
132    RegionInfo[] regions;
133    Deque<BalancerRegionLoad>[] regionLoads;
134    private RegionLocationFinder regionFinder;
135
136    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
137
138    int[]   serverIndexToHostIndex;      //serverIndex -> host index
139    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
140
141    int[][] regionsPerServer;            //serverIndex -> region list
142    int[][] regionsPerHost;              //hostIndex -> list of regions
143    int[][] regionsPerRack;              //rackIndex -> region list
144    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
145    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
146    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
147
148    int[][] serversPerHost;              //hostIndex -> list of server indexes
149    int[][] serversPerRack;              //rackIndex -> list of server indexes
150    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
151    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
152    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
153    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
154    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
155    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
156    boolean hasRegionReplicas = false;   //whether there is regions with replicas
157
158    Integer[] serverIndicesSortedByRegionCount;
159    Integer[] serverIndicesSortedByLocality;
160
161    Map<String, Integer> serversToIndex;
162    Map<String, Integer> hostsToIndex;
163    Map<String, Integer> racksToIndex;
164    Map<String, Integer> tablesToIndex;
165    Map<RegionInfo, Integer> regionsToIndex;
166    float[] localityPerServer;
167
168    int numServers;
169    int numHosts;
170    int numRacks;
171    int numTables;
172    int numRegions;
173
174    int numMovedRegions = 0; //num moved regions from the initial configuration
175    Map<ServerName, List<RegionInfo>> clusterState;
176
177    protected final RackManager rackManager;
178    // Maps region -> rackIndex -> locality of region on rack
179    private float[][] rackLocalities;
180    // Maps localityType -> region -> [server|rack]Index with highest locality
181    private int[][] regionsToMostLocalEntities;
182
183    protected Cluster(
184        Map<ServerName, List<RegionInfo>> clusterState,
185        Map<String, Deque<BalancerRegionLoad>> loads,
186        RegionLocationFinder regionFinder,
187        RackManager rackManager) {
188      this(null, clusterState, loads, regionFinder, rackManager);
189    }
190
191    @SuppressWarnings("unchecked")
192    protected Cluster(
193        Collection<RegionInfo> unassignedRegions,
194        Map<ServerName, List<RegionInfo>> clusterState,
195        Map<String, Deque<BalancerRegionLoad>> loads,
196        RegionLocationFinder regionFinder,
197        RackManager rackManager) {
198
199      if (unassignedRegions == null) {
200        unassignedRegions = EMPTY_REGION_LIST;
201      }
202
203      serversToIndex = new HashMap<>();
204      hostsToIndex = new HashMap<>();
205      racksToIndex = new HashMap<>();
206      tablesToIndex = new HashMap<>();
207
208      //TODO: We should get the list of tables from master
209      tables = new ArrayList<>();
210      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
211
212      numRegions = 0;
213
214      List<List<Integer>> serversPerHostList = new ArrayList<>();
215      List<List<Integer>> serversPerRackList = new ArrayList<>();
216      this.clusterState = clusterState;
217      this.regionFinder = regionFinder;
218
219      // Use servername and port as there can be dead servers in this list. We want everything with
220      // a matching hostname and port to have the same index.
221      for (ServerName sn : clusterState.keySet()) {
222        if (sn == null) {
223          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
224              "skipping; unassigned regions?");
225          if (LOG.isTraceEnabled()) {
226            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
227          }
228          continue;
229        }
230        if (serversToIndex.get(sn.getAddress().toString()) == null) {
231          serversToIndex.put(sn.getHostAndPort(), numServers++);
232        }
233        if (!hostsToIndex.containsKey(sn.getHostname())) {
234          hostsToIndex.put(sn.getHostname(), numHosts++);
235          serversPerHostList.add(new ArrayList<>(1));
236        }
237
238        int serverIndex = serversToIndex.get(sn.getHostAndPort());
239        int hostIndex = hostsToIndex.get(sn.getHostname());
240        serversPerHostList.get(hostIndex).add(serverIndex);
241
242        String rack = this.rackManager.getRack(sn);
243        if (!racksToIndex.containsKey(rack)) {
244          racksToIndex.put(rack, numRacks++);
245          serversPerRackList.add(new ArrayList<>());
246        }
247        int rackIndex = racksToIndex.get(rack);
248        serversPerRackList.get(rackIndex).add(serverIndex);
249      }
250
251      // Count how many regions there are.
252      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
253        numRegions += entry.getValue().size();
254      }
255      numRegions += unassignedRegions.size();
256
257      regionsToIndex = new HashMap<>(numRegions);
258      servers = new ServerName[numServers];
259      serversPerHost = new int[numHosts][];
260      serversPerRack = new int[numRacks][];
261      regions = new RegionInfo[numRegions];
262      regionIndexToServerIndex = new int[numRegions];
263      initialRegionIndexToServerIndex = new int[numRegions];
264      regionIndexToTableIndex = new int[numRegions];
265      regionIndexToPrimaryIndex = new int[numRegions];
266      regionLoads = new Deque[numRegions];
267
268      regionLocations = new int[numRegions][];
269      serverIndicesSortedByRegionCount = new Integer[numServers];
270      serverIndicesSortedByLocality = new Integer[numServers];
271      localityPerServer = new float[numServers];
272
273      serverIndexToHostIndex = new int[numServers];
274      serverIndexToRackIndex = new int[numServers];
275      regionsPerServer = new int[numServers][];
276      regionsPerHost = new int[numHosts][];
277      regionsPerRack = new int[numRacks][];
278      primariesOfRegionsPerServer = new int[numServers][];
279      primariesOfRegionsPerHost = new int[numHosts][];
280      primariesOfRegionsPerRack = new int[numRacks][];
281
282      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
283
284      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
285        if (entry.getKey() == null) {
286          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
287          continue;
288        }
289        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
290
291        // keep the servername if this is the first server name for this hostname
292        // or this servername has the newest startcode.
293        if (servers[serverIndex] == null ||
294            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
295          servers[serverIndex] = entry.getKey();
296        }
297
298        if (regionsPerServer[serverIndex] != null) {
299          // there is another server with the same hostAndPort in ClusterState.
300          // allocate the array for the total size
301          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
302        } else {
303          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
304        }
305        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
306        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
307        serverIndicesSortedByLocality[serverIndex] = serverIndex;
308      }
309
310      hosts = new String[numHosts];
311      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
312        hosts[entry.getValue()] = entry.getKey();
313      }
314      racks = new String[numRacks];
315      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
316        racks[entry.getValue()] = entry.getKey();
317      }
318
319      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
320        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
321        regionPerServerIndex = 0;
322
323        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
324        serverIndexToHostIndex[serverIndex] = hostIndex;
325
326        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
327        serverIndexToRackIndex[serverIndex] = rackIndex;
328
329        for (RegionInfo region : entry.getValue()) {
330          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
331          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
332          regionIndex++;
333        }
334      }
335
336      for (RegionInfo region : unassignedRegions) {
337        registerRegion(region, regionIndex, -1, loads, regionFinder);
338        regionIndex++;
339      }
340
341      for (int i = 0; i < serversPerHostList.size(); i++) {
342        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
343        for (int j = 0; j < serversPerHost[i].length; j++) {
344          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
345        }
346        if (serversPerHost[i].length > 1) {
347          multiServersPerHost = true;
348        }
349      }
350
351      for (int i = 0; i < serversPerRackList.size(); i++) {
352        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
353        for (int j = 0; j < serversPerRack[i].length; j++) {
354          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
355        }
356      }
357
358      numTables = tables.size();
359      numRegionsPerServerPerTable = new int[numServers][numTables];
360
361      for (int i = 0; i < numServers; i++) {
362        for (int j = 0; j < numTables; j++) {
363          numRegionsPerServerPerTable[i][j] = 0;
364        }
365      }
366
367      for (int i=0; i < regionIndexToServerIndex.length; i++) {
368        if (regionIndexToServerIndex[i] >= 0) {
369          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
370        }
371      }
372
373      numMaxRegionsPerTable = new int[numTables];
374      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
375        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
376          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
377            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
378          }
379        }
380      }
381
382      for (int i = 0; i < regions.length; i ++) {
383        RegionInfo info = regions[i];
384        if (RegionReplicaUtil.isDefaultReplica(info)) {
385          regionIndexToPrimaryIndex[i] = i;
386        } else {
387          hasRegionReplicas = true;
388          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
389          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
390        }
391      }
392
393      for (int i = 0; i < regionsPerServer.length; i++) {
394        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
395        for (int j = 0; j < regionsPerServer[i].length; j++) {
396          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
397          primariesOfRegionsPerServer[i][j] = primaryIndex;
398        }
399        // sort the regions by primaries.
400        Arrays.sort(primariesOfRegionsPerServer[i]);
401      }
402
403      // compute regionsPerHost
404      if (multiServersPerHost) {
405        for (int i = 0 ; i < serversPerHost.length; i++) {
406          int numRegionsPerHost = 0;
407          for (int j = 0; j < serversPerHost[i].length; j++) {
408            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
409          }
410          regionsPerHost[i] = new int[numRegionsPerHost];
411          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
412        }
413        for (int i = 0 ; i < serversPerHost.length; i++) {
414          int numRegionPerHostIndex = 0;
415          for (int j = 0; j < serversPerHost[i].length; j++) {
416            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
417              int region = regionsPerServer[serversPerHost[i][j]][k];
418              regionsPerHost[i][numRegionPerHostIndex] = region;
419              int primaryIndex = regionIndexToPrimaryIndex[region];
420              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
421              numRegionPerHostIndex++;
422            }
423          }
424          // sort the regions by primaries.
425          Arrays.sort(primariesOfRegionsPerHost[i]);
426        }
427      }
428
429      // compute regionsPerRack
430      if (numRacks > 1) {
431        for (int i = 0 ; i < serversPerRack.length; i++) {
432          int numRegionsPerRack = 0;
433          for (int j = 0; j < serversPerRack[i].length; j++) {
434            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
435          }
436          regionsPerRack[i] = new int[numRegionsPerRack];
437          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
438        }
439
440        for (int i = 0 ; i < serversPerRack.length; i++) {
441          int numRegionPerRackIndex = 0;
442          for (int j = 0; j < serversPerRack[i].length; j++) {
443            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
444              int region = regionsPerServer[serversPerRack[i][j]][k];
445              regionsPerRack[i][numRegionPerRackIndex] = region;
446              int primaryIndex = regionIndexToPrimaryIndex[region];
447              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
448              numRegionPerRackIndex++;
449            }
450          }
451          // sort the regions by primaries.
452          Arrays.sort(primariesOfRegionsPerRack[i]);
453        }
454      }
455    }
456
457    /** Helper for Cluster constructor to handle a region */
458    private void registerRegion(RegionInfo region, int regionIndex,
459        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
460        RegionLocationFinder regionFinder) {
461      String tableName = region.getTable().getNameAsString();
462      if (!tablesToIndex.containsKey(tableName)) {
463        tables.add(tableName);
464        tablesToIndex.put(tableName, tablesToIndex.size());
465      }
466      int tableIndex = tablesToIndex.get(tableName);
467
468      regionsToIndex.put(region, regionIndex);
469      regions[regionIndex] = region;
470      regionIndexToServerIndex[regionIndex] = serverIndex;
471      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
472      regionIndexToTableIndex[regionIndex] = tableIndex;
473
474      // region load
475      if (loads != null) {
476        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
477        // That could have failed if the RegionLoad is using the other regionName
478        if (rl == null) {
479          // Try getting the region load using encoded name.
480          rl = loads.get(region.getEncodedName());
481        }
482        regionLoads[regionIndex] = rl;
483      }
484
485      if (regionFinder != null) {
486        // region location
487        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
488        regionLocations[regionIndex] = new int[loc.size()];
489        for (int i = 0; i < loc.size(); i++) {
490          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
491              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
492                  : serversToIndex.get(loc.get(i).getHostAndPort()));
493        }
494      }
495    }
496
497    /**
498     * Returns true iff a given server has less regions than the balanced amount
499     */
500    public boolean serverHasTooFewRegions(int server) {
501      int minLoad = this.numRegions / numServers;
502      int numRegions = getNumRegions(server);
503      return numRegions < minLoad;
504    }
505
506    /**
507     * Retrieves and lazily initializes a field storing the locality of
508     * every region/server combination
509     */
510    public float[][] getOrComputeRackLocalities() {
511      if (rackLocalities == null || regionsToMostLocalEntities == null) {
512        computeCachedLocalities();
513      }
514      return rackLocalities;
515    }
516
517    /**
518     * Lazily initializes and retrieves a mapping of region -> server for which region has
519     * the highest the locality
520     */
521    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
522      if (rackLocalities == null || regionsToMostLocalEntities == null) {
523        computeCachedLocalities();
524      }
525      return regionsToMostLocalEntities[type.ordinal()];
526    }
527
528    /**
529     * Looks up locality from cache of localities. Will create cache if it does
530     * not already exist.
531     */
532    public float getOrComputeLocality(int region, int entity, LocalityType type) {
533      switch (type) {
534        case SERVER:
535          return getLocalityOfRegion(region, entity);
536        case RACK:
537          return getOrComputeRackLocalities()[region][entity];
538        default:
539          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
540      }
541    }
542
543    /**
544     * Returns locality weighted by region size in MB. Will create locality cache
545     * if it does not already exist.
546     */
547    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
548      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
549    }
550
551    /**
552     * Returns the size in MB from the most recent RegionLoad for region
553     */
554    public int getRegionSizeMB(int region) {
555      Deque<BalancerRegionLoad> load = regionLoads[region];
556      // This means regions have no actual data on disk
557      if (load == null) {
558        return 0;
559      }
560      return regionLoads[region].getLast().getStorefileSizeMB();
561    }
562
563    /**
564     * Computes and caches the locality for each region/rack combinations,
565     * as well as storing a mapping of region -> server and region -> rack such that server
566     * and rack have the highest locality for region
567     */
568    private void computeCachedLocalities() {
569      rackLocalities = new float[numRegions][numRacks];
570      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
571
572      // Compute localities and find most local server per region
573      for (int region = 0; region < numRegions; region++) {
574        int serverWithBestLocality = 0;
575        float bestLocalityForRegion = 0;
576        for (int server = 0; server < numServers; server++) {
577          // Aggregate per-rack locality
578          float locality = getLocalityOfRegion(region, server);
579          int rack = serverIndexToRackIndex[server];
580          int numServersInRack = serversPerRack[rack].length;
581          rackLocalities[region][rack] += locality / numServersInRack;
582
583          if (locality > bestLocalityForRegion) {
584            serverWithBestLocality = server;
585            bestLocalityForRegion = locality;
586          }
587        }
588        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
589
590        // Find most local rack per region
591        int rackWithBestLocality = 0;
592        float bestRackLocalityForRegion = 0.0f;
593        for (int rack = 0; rack < numRacks; rack++) {
594          float rackLocality = rackLocalities[region][rack];
595          if (rackLocality > bestRackLocalityForRegion) {
596            bestRackLocalityForRegion = rackLocality;
597            rackWithBestLocality = rack;
598          }
599        }
600        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
601      }
602
603    }
604
605    /**
606     * Maps region index to rack index
607     */
608    public int getRackForRegion(int region) {
609      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
610    }
611
612    enum LocalityType {
613      SERVER,
614      RACK
615    }
616
617    /** An action to move or swap a region */
618    public static class Action {
619      public enum Type {
620        ASSIGN_REGION,
621        MOVE_REGION,
622        SWAP_REGIONS,
623        NULL,
624      }
625
626      public Type type;
627      public Action (Type type) {this.type = type;}
628      /** Returns an Action which would undo this action */
629      public Action undoAction() { return this; }
630      @Override
631      public String toString() { return type + ":";}
632    }
633
634    public static class AssignRegionAction extends Action {
635      public int region;
636      public int server;
637      public AssignRegionAction(int region, int server) {
638        super(Type.ASSIGN_REGION);
639        this.region = region;
640        this.server = server;
641      }
642      @Override
643      public Action undoAction() {
644        // TODO implement this. This action is not being used by the StochasticLB for now
645        // in case it uses it, we should implement this function.
646        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
647      }
648      @Override
649      public String toString() {
650        return type + ": " + region + ":" + server;
651      }
652    }
653
654    public static class MoveRegionAction extends Action {
655      public int region;
656      public int fromServer;
657      public int toServer;
658
659      public MoveRegionAction(int region, int fromServer, int toServer) {
660        super(Type.MOVE_REGION);
661        this.fromServer = fromServer;
662        this.region = region;
663        this.toServer = toServer;
664      }
665      @Override
666      public Action undoAction() {
667        return new MoveRegionAction (region, toServer, fromServer);
668      }
669      @Override
670      public String toString() {
671        return type + ": " + region + ":" + fromServer + " -> " + toServer;
672      }
673    }
674
675    public static class SwapRegionsAction extends Action {
676      public int fromServer;
677      public int fromRegion;
678      public int toServer;
679      public int toRegion;
680      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
681        super(Type.SWAP_REGIONS);
682        this.fromServer = fromServer;
683        this.fromRegion = fromRegion;
684        this.toServer = toServer;
685        this.toRegion = toRegion;
686      }
687      @Override
688      public Action undoAction() {
689        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
690      }
691      @Override
692      public String toString() {
693        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
694      }
695    }
696
697    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
698        justification="Mistake. Too disruptive to change now")
699    public static final Action NullAction = new Action(Type.NULL);
700
701    public void doAction(Action action) {
702      switch (action.type) {
703      case NULL: break;
704      case ASSIGN_REGION:
705        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
706        assert action instanceof AssignRegionAction: action.getClass();
707        AssignRegionAction ar = (AssignRegionAction) action;
708        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
709        regionMoved(ar.region, -1, ar.server);
710        break;
711      case MOVE_REGION:
712        assert action instanceof MoveRegionAction: action.getClass();
713        MoveRegionAction mra = (MoveRegionAction) action;
714        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
715        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
716        regionMoved(mra.region, mra.fromServer, mra.toServer);
717        break;
718      case SWAP_REGIONS:
719        assert action instanceof SwapRegionsAction: action.getClass();
720        SwapRegionsAction a = (SwapRegionsAction) action;
721        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
722        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
723        regionMoved(a.fromRegion, a.fromServer, a.toServer);
724        regionMoved(a.toRegion, a.toServer, a.fromServer);
725        break;
726      default:
727        throw new RuntimeException("Uknown action:" + action.type);
728      }
729    }
730
731    /**
732     * Return true if the placement of region on server would lower the availability
733     * of the region in question
734     * @return true or false
735     */
736    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
737      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
738        return false; // safeguard against race between cluster.servers and servers from LB method args
739      }
740      int server = serversToIndex.get(serverName.getHostAndPort());
741      int region = regionsToIndex.get(regionInfo);
742
743      int primary = regionIndexToPrimaryIndex[region];
744
745      // there is a subset relation for server < host < rack
746      // check server first
747
748      if (contains(primariesOfRegionsPerServer[server], primary)) {
749        // check for whether there are other servers that we can place this region
750        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
751          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
752            return true; // meaning there is a better server
753          }
754        }
755        return false; // there is not a better server to place this
756      }
757
758      // check host
759      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
760        int host = serverIndexToHostIndex[server];
761        if (contains(primariesOfRegionsPerHost[host], primary)) {
762          // check for whether there are other hosts that we can place this region
763          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
764            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
765              return true; // meaning there is a better host
766            }
767          }
768          return false; // there is not a better host to place this
769        }
770      }
771
772      // check rack
773      if (numRacks > 1) {
774        int rack = serverIndexToRackIndex[server];
775        if (contains(primariesOfRegionsPerRack[rack], primary)) {
776          // check for whether there are other racks that we can place this region
777          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
778            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
779              return true; // meaning there is a better rack
780            }
781          }
782          return false; // there is not a better rack to place this
783        }
784      }
785      return false;
786    }
787
788    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
789      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
790        return;
791      }
792      int server = serversToIndex.get(serverName.getHostAndPort());
793      int region = regionsToIndex.get(regionInfo);
794      doAction(new AssignRegionAction(region, server));
795    }
796
797    void regionMoved(int region, int oldServer, int newServer) {
798      regionIndexToServerIndex[region] = newServer;
799      if (initialRegionIndexToServerIndex[region] == newServer) {
800        numMovedRegions--; //region moved back to original location
801      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
802        numMovedRegions++; //region moved from original location
803      }
804      int tableIndex = regionIndexToTableIndex[region];
805      if (oldServer >= 0) {
806        numRegionsPerServerPerTable[oldServer][tableIndex]--;
807      }
808      numRegionsPerServerPerTable[newServer][tableIndex]++;
809
810      //check whether this caused maxRegionsPerTable in the new Server to be updated
811      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
812        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
813      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
814          == numMaxRegionsPerTable[tableIndex]) {
815        //recompute maxRegionsPerTable since the previous value was coming from the old server
816        numMaxRegionsPerTable[tableIndex] = 0;
817        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
818          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
819            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
820          }
821        }
822      }
823
824      // update for servers
825      int primary = regionIndexToPrimaryIndex[region];
826      if (oldServer >= 0) {
827        primariesOfRegionsPerServer[oldServer] = removeRegion(
828          primariesOfRegionsPerServer[oldServer], primary);
829      }
830      primariesOfRegionsPerServer[newServer] = addRegionSorted(
831        primariesOfRegionsPerServer[newServer], primary);
832
833      // update for hosts
834      if (multiServersPerHost) {
835        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
836        int newHost = serverIndexToHostIndex[newServer];
837        if (newHost != oldHost) {
838          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
839          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
840          if (oldHost >= 0) {
841            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
842            primariesOfRegionsPerHost[oldHost] = removeRegion(
843              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
844          }
845        }
846      }
847
848      // update for racks
849      if (numRacks > 1) {
850        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
851        int newRack = serverIndexToRackIndex[newServer];
852        if (newRack != oldRack) {
853          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
854          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
855          if (oldRack >= 0) {
856            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
857            primariesOfRegionsPerRack[oldRack] = removeRegion(
858              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
859          }
860        }
861      }
862    }
863
864    int[] removeRegion(int[] regions, int regionIndex) {
865      //TODO: this maybe costly. Consider using linked lists
866      int[] newRegions = new int[regions.length - 1];
867      int i = 0;
868      for (i = 0; i < regions.length; i++) {
869        if (regions[i] == regionIndex) {
870          break;
871        }
872        newRegions[i] = regions[i];
873      }
874      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
875      return newRegions;
876    }
877
878    int[] addRegion(int[] regions, int regionIndex) {
879      int[] newRegions = new int[regions.length + 1];
880      System.arraycopy(regions, 0, newRegions, 0, regions.length);
881      newRegions[newRegions.length - 1] = regionIndex;
882      return newRegions;
883    }
884
885    int[] addRegionSorted(int[] regions, int regionIndex) {
886      int[] newRegions = new int[regions.length + 1];
887      int i = 0;
888      for (i = 0; i < regions.length; i++) { // find the index to insert
889        if (regions[i] > regionIndex) {
890          break;
891        }
892      }
893      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
894      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
895      newRegions[i] = regionIndex;
896
897      return newRegions;
898    }
899
900    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
901      int i = 0;
902      for (i = 0; i < regions.length; i++) {
903        if (regions[i] == regionIndex) {
904          regions[i] = newRegionIndex;
905          break;
906        }
907      }
908      return regions;
909    }
910
911    void sortServersByRegionCount() {
912      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
913    }
914
915    int getNumRegions(int server) {
916      return regionsPerServer[server].length;
917    }
918
919    boolean contains(int[] arr, int val) {
920      return Arrays.binarySearch(arr, val) >= 0;
921    }
922
923    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
924
925    int getLowestLocalityRegionOnServer(int serverIndex) {
926      if (regionFinder != null) {
927        float lowestLocality = 1.0f;
928        int lowestLocalityRegionIndex = -1;
929        if (regionsPerServer[serverIndex].length == 0) {
930          // No regions on that region server
931          return -1;
932        }
933        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
934          int regionIndex = regionsPerServer[serverIndex][j];
935          HDFSBlocksDistribution distribution = regionFinder
936              .getBlockDistribution(regions[regionIndex]);
937          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
938          // skip empty region
939          if (distribution.getUniqueBlocksTotalWeight() == 0) {
940            continue;
941          }
942          if (locality < lowestLocality) {
943            lowestLocality = locality;
944            lowestLocalityRegionIndex = j;
945          }
946        }
947        if (lowestLocalityRegionIndex == -1) {
948          return -1;
949        }
950        if (LOG.isTraceEnabled()) {
951          LOG.trace("Lowest locality region is "
952              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
953                  .getRegionNameAsString() + " with locality " + lowestLocality
954              + " and its region server contains " + regionsPerServer[serverIndex].length
955              + " regions");
956        }
957        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
958      } else {
959        return -1;
960      }
961    }
962
963    float getLocalityOfRegion(int region, int server) {
964      if (regionFinder != null) {
965        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
966        return distribution.getBlockLocalityIndex(servers[server].getHostname());
967      } else {
968        return 0f;
969      }
970    }
971
972    @VisibleForTesting
973    protected void setNumRegions(int numRegions) {
974      this.numRegions = numRegions;
975    }
976
977    @VisibleForTesting
978    protected void setNumMovedRegions(int numMovedRegions) {
979      this.numMovedRegions = numMovedRegions;
980    }
981
982    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
983        justification="Not important but should be fixed")
984    @Override
985    public String toString() {
986      StringBuilder desc = new StringBuilder("Cluster={servers=[");
987      for(ServerName sn:servers) {
988        desc.append(sn.getHostAndPort()).append(", ");
989      }
990      desc.append("], serverIndicesSortedByRegionCount=")
991          .append(Arrays.toString(serverIndicesSortedByRegionCount))
992          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
993
994      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
995          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
996          .append(", numTables=").append(numTables).append(", numMovedRegions=")
997          .append(numMovedRegions).append('}');
998      return desc.toString();
999    }
1000  }
1001
1002  // slop for regions
1003  protected float slop;
1004  // overallSlop to control simpleLoadBalancer's cluster level threshold
1005  protected float overallSlop;
1006  protected Configuration config = HBaseConfiguration.create();
1007  protected RackManager rackManager;
1008  private static final Random RANDOM = new Random(System.currentTimeMillis());
1009  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1010  protected MetricsBalancer metricsBalancer = null;
1011  protected ClusterMetrics clusterStatus = null;
1012  protected ServerName masterServerName;
1013  protected MasterServices services;
1014  protected boolean onlySystemTablesOnMaster;
1015  protected boolean maintenanceMode;
1016
1017  @Override
1018  public void setConf(Configuration conf) {
1019    this.config = conf;
1020    setSlop(conf);
1021    if (slop < 0) slop = 0;
1022    else if (slop > 1) slop = 1;
1023
1024    if (overallSlop < 0) overallSlop = 0;
1025    else if (overallSlop > 1) overallSlop = 1;
1026
1027    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1028
1029    this.rackManager = new RackManager(getConf());
1030    if (useRegionFinder) {
1031      regionFinder.setConf(conf);
1032    }
1033    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1034    LOG.info("slop={}, systemTablesOnMaster={}",
1035        this.slop, this.onlySystemTablesOnMaster);
1036  }
1037
1038  protected void setSlop(Configuration conf) {
1039    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1040    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1041  }
1042
1043  /**
1044   * Check if a region belongs to some system table.
1045   * If so, the primary replica may be expected to be put on the master regionserver.
1046   */
1047  public boolean shouldBeOnMaster(RegionInfo region) {
1048    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1049        && region.getTable().isSystemTable();
1050  }
1051
1052  /**
1053   * Balance the regions that should be on master regionserver.
1054   */
1055  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1056    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1057    List<RegionPlan> plans = null;
1058    List<RegionInfo> regions = clusterMap.get(masterServerName);
1059    if (regions != null) {
1060      Iterator<ServerName> keyIt = null;
1061      for (RegionInfo region: regions) {
1062        if (shouldBeOnMaster(region)) continue;
1063
1064        // Find a non-master regionserver to host the region
1065        if (keyIt == null || !keyIt.hasNext()) {
1066          keyIt = clusterMap.keySet().iterator();
1067        }
1068        ServerName dest = keyIt.next();
1069        if (masterServerName.equals(dest)) {
1070          if (!keyIt.hasNext()) {
1071            keyIt = clusterMap.keySet().iterator();
1072          }
1073          dest = keyIt.next();
1074        }
1075
1076        // Move this region away from the master regionserver
1077        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1078        if (plans == null) {
1079          plans = new ArrayList<>();
1080        }
1081        plans.add(plan);
1082      }
1083    }
1084    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1085      if (masterServerName.equals(server.getKey())) continue;
1086      for (RegionInfo region: server.getValue()) {
1087        if (!shouldBeOnMaster(region)) continue;
1088
1089        // Move this region to the master regionserver
1090        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1091        if (plans == null) {
1092          plans = new ArrayList<>();
1093        }
1094        plans.add(plan);
1095      }
1096    }
1097    return plans;
1098  }
1099
1100  /**
1101   * If master is configured to carry system tables only, in here is
1102   * where we figure what to assign it.
1103   */
1104  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1105      Collection<RegionInfo> regions, List<ServerName> servers) {
1106    if (servers == null || regions == null || regions.isEmpty()) {
1107      return null;
1108    }
1109    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1110    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1111      if (masterServerName != null && servers.contains(masterServerName)) {
1112        assignments.put(masterServerName, new ArrayList<>());
1113        for (RegionInfo region : regions) {
1114          if (shouldBeOnMaster(region)) {
1115            assignments.get(masterServerName).add(region);
1116          }
1117        }
1118      }
1119    }
1120    return assignments;
1121  }
1122
1123  @Override
1124  public Configuration getConf() {
1125    return this.config;
1126  }
1127
1128  @Override
1129  public synchronized void setClusterMetrics(ClusterMetrics st) {
1130    this.clusterStatus = st;
1131    if (useRegionFinder) {
1132      regionFinder.setClusterMetrics(st);
1133    }
1134  }
1135
1136  @Override
1137  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
1138
1139  }
1140
1141  @Override
1142  public void setMasterServices(MasterServices masterServices) {
1143    masterServerName = masterServices.getServerName();
1144    this.services = masterServices;
1145    if (useRegionFinder) {
1146      this.regionFinder.setServices(masterServices);
1147    }
1148    if (this.services.isInMaintenanceMode()) {
1149      this.maintenanceMode = true;
1150    }
1151  }
1152
1153  @Override
1154  public void postMasterStartupInitialize() {
1155    if (services != null && regionFinder != null) {
1156      try {
1157        Set<RegionInfo> regions =
1158            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1159        regionFinder.refreshAndWait(regions);
1160      } catch (Exception e) {
1161        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1162      }
1163    }
1164  }
1165
1166  public void setRackManager(RackManager rackManager) {
1167    this.rackManager = rackManager;
1168  }
1169
1170  protected boolean needsBalance(Cluster c) {
1171    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1172    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1173      if (LOG.isDebugEnabled()) {
1174        LOG.debug("Not running balancer because only " + cs.getNumServers()
1175            + " active regionserver(s)");
1176      }
1177      return false;
1178    }
1179    if(areSomeRegionReplicasColocated(c)) return true;
1180    // Check if we even need to do any load balancing
1181    // HBASE-3681 check sloppiness first
1182    float average = cs.getLoadAverage(); // for logging
1183    int floor = (int) Math.floor(average * (1 - slop));
1184    int ceiling = (int) Math.ceil(average * (1 + slop));
1185    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1186      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1187      if (LOG.isTraceEnabled()) {
1188        // If nothing to balance, then don't say anything unless trace-level logging.
1189        LOG.trace("Skipping load balancing because balanced cluster; " +
1190          "servers=" + cs.getNumServers() +
1191          " regions=" + cs.getNumRegions() + " average=" + average +
1192          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1193          " leastloaded=" + serversByLoad.firstKey().getLoad());
1194      }
1195      return false;
1196    }
1197    return true;
1198  }
1199
1200  /**
1201   * Subclasses should implement this to return true if the cluster has nodes that hosts
1202   * multiple replicas for the same region, or, if there are multiple racks and the same
1203   * rack hosts replicas of the same region
1204   * @param c Cluster information
1205   * @return whether region replicas are currently co-located
1206   */
1207  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1208    return false;
1209  }
1210
1211  /**
1212   * Generates a bulk assignment plan to be used on cluster startup using a
1213   * simple round-robin assignment.
1214   * <p>
1215   * Takes a list of all the regions and all the servers in the cluster and
1216   * returns a map of each server to the regions that it should be assigned.
1217   * <p>
1218   * Currently implemented as a round-robin assignment. Same invariant as load
1219   * balancing, all servers holding floor(avg) or ceiling(avg).
1220   *
1221   * TODO: Use block locations from HDFS to place regions with their blocks
1222   *
1223   * @param regions all regions
1224   * @param servers all servers
1225   * @return map of server to the regions it should take, or null if no
1226   *         assignment is possible (ie. no regions or no servers)
1227   */
1228  @Override
1229  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1230      List<ServerName> servers) throws HBaseIOException {
1231    metricsBalancer.incrMiscInvocations();
1232    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1233    if (assignments != null && !assignments.isEmpty()) {
1234      servers = new ArrayList<>(servers);
1235      // Guarantee not to put other regions on master
1236      servers.remove(masterServerName);
1237      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1238      if (!masterRegions.isEmpty()) {
1239        regions = new ArrayList<>(regions);
1240        regions.removeAll(masterRegions);
1241      }
1242    }
1243    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1244      return assignments;
1245    }
1246
1247    int numServers = servers == null ? 0 : servers.size();
1248    if (numServers == 0) {
1249      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1250      return null;
1251    }
1252
1253    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1254    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1255    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1256    // and balanced. This should also run fast with fewer number of iterations.
1257
1258    if (numServers == 1) { // Only one server, nothing fancy we can do here
1259      ServerName server = servers.get(0);
1260      assignments.put(server, new ArrayList<>(regions));
1261      return assignments;
1262    }
1263
1264    Cluster cluster = createCluster(servers, regions);
1265    List<RegionInfo> unassignedRegions = new ArrayList<>();
1266
1267    roundRobinAssignment(cluster, regions, unassignedRegions,
1268      servers, assignments);
1269
1270    List<RegionInfo> lastFewRegions = new ArrayList<>();
1271    // assign the remaining by going through the list and try to assign to servers one-by-one
1272    int serverIdx = RANDOM.nextInt(numServers);
1273    for (RegionInfo region : unassignedRegions) {
1274      boolean assigned = false;
1275      for (int j = 0; j < numServers; j++) { // try all servers one by one
1276        ServerName serverName = servers.get((j + serverIdx) % numServers);
1277        if (!cluster.wouldLowerAvailability(region, serverName)) {
1278          List<RegionInfo> serverRegions =
1279              assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
1280          serverRegions.add(region);
1281          cluster.doAssignRegion(region, serverName);
1282          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1283          assigned = true;
1284          break;
1285        }
1286      }
1287      if (!assigned) {
1288        lastFewRegions.add(region);
1289      }
1290    }
1291    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1292    // make it optimal later. we can end up with this if numReplicas > numServers.
1293    for (RegionInfo region : lastFewRegions) {
1294      int i = RANDOM.nextInt(numServers);
1295      ServerName server = servers.get(i);
1296      List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>());
1297      serverRegions.add(region);
1298      cluster.doAssignRegion(region, server);
1299    }
1300    return assignments;
1301  }
1302
1303  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) {
1304    // Get the snapshot of the current assignments for the regions in question, and then create
1305    // a cluster out of it. Note that we might have replicas already assigned to some servers
1306    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1307    // replicas of the regions that are passed (for performance).
1308    Map<ServerName, List<RegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
1309
1310    for (ServerName server : servers) {
1311      if (!clusterState.containsKey(server)) {
1312        clusterState.put(server, EMPTY_REGION_LIST);
1313      }
1314    }
1315    return new Cluster(regions, clusterState, null, this.regionFinder,
1316        rackManager);
1317  }
1318
1319  private List<ServerName> findIdleServers(List<ServerName> servers) {
1320    return this.services.getServerManager()
1321            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1322  }
1323
1324  /**
1325   * Used to assign a single region to a random server.
1326   */
1327  @Override
1328  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1329      throws HBaseIOException {
1330    metricsBalancer.incrMiscInvocations();
1331    if (servers != null && servers.contains(masterServerName)) {
1332      if (shouldBeOnMaster(regionInfo)) {
1333        return masterServerName;
1334      }
1335      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1336        // Guarantee we do not put any regions on master
1337        servers = new ArrayList<>(servers);
1338        servers.remove(masterServerName);
1339      }
1340    }
1341
1342    int numServers = servers == null ? 0 : servers.size();
1343    if (numServers == 0) {
1344      LOG.warn("Wanted to retain assignment but no servers to assign to");
1345      return null;
1346    }
1347    if (numServers == 1) { // Only one server, nothing fancy we can do here
1348      return servers.get(0);
1349    }
1350    List<ServerName> idleServers = findIdleServers(servers);
1351    if (idleServers.size() == 1) {
1352      return idleServers.get(0);
1353    }
1354    final List<ServerName> finalServers = idleServers.isEmpty() ?
1355            servers : idleServers;
1356    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1357    Cluster cluster = createCluster(finalServers, regions);
1358    return randomAssignment(cluster, regionInfo, finalServers);
1359  }
1360
1361  /**
1362   * Generates a bulk assignment startup plan, attempting to reuse the existing
1363   * assignment information from META, but adjusting for the specified list of
1364   * available/online servers available for assignment.
1365   * <p>
1366   * Takes a map of all regions to their existing assignment from META. Also
1367   * takes a list of online servers for regions to be assigned to. Attempts to
1368   * retain all assignment, so in some instances initial assignment will not be
1369   * completely balanced.
1370   * <p>
1371   * Any leftover regions without an existing server to be assigned to will be
1372   * assigned randomly to available servers.
1373   *
1374   * @param regions regions and existing assignment from meta
1375   * @param servers available servers
1376   * @return map of servers and regions to be assigned to them
1377   */
1378  @Override
1379  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1380      List<ServerName> servers) throws HBaseIOException {
1381    // Update metrics
1382    metricsBalancer.incrMiscInvocations();
1383    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1384    if (assignments != null && !assignments.isEmpty()) {
1385      servers = new ArrayList<>(servers);
1386      // Guarantee not to put other regions on master
1387      servers.remove(masterServerName);
1388      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1389      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1390          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1391    }
1392    if (this.maintenanceMode || regions.isEmpty()) {
1393      return assignments;
1394    }
1395
1396    int numServers = servers == null ? 0 : servers.size();
1397    if (numServers == 0) {
1398      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1399      return null;
1400    }
1401    if (numServers == 1) { // Only one server, nothing fancy we can do here
1402      ServerName server = servers.get(0);
1403      assignments.put(server, new ArrayList<>(regions.keySet()));
1404      return assignments;
1405    }
1406
1407    // Group all of the old assignments by their hostname.
1408    // We can't group directly by ServerName since the servers all have
1409    // new start-codes.
1410
1411    // Group the servers by their hostname. It's possible we have multiple
1412    // servers on the same host on different ports.
1413    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1414    for (ServerName server : servers) {
1415      assignments.put(server, new ArrayList<>());
1416      serversByHostname.put(server.getHostnameLowerCase(), server);
1417    }
1418
1419    // Collection of the hostnames that used to have regions
1420    // assigned, but for which we no longer have any RS running
1421    // after the cluster restart.
1422    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1423
1424    int numRandomAssignments = 0;
1425    int numRetainedAssigments = 0;
1426
1427    Cluster cluster = createCluster(servers, regions.keySet());
1428
1429    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1430      RegionInfo region = entry.getKey();
1431      ServerName oldServerName = entry.getValue();
1432      List<ServerName> localServers = new ArrayList<>();
1433      if (oldServerName != null) {
1434        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1435      }
1436      if (localServers.isEmpty()) {
1437        // No servers on the new cluster match up with this hostname,
1438        // assign randomly.
1439        ServerName randomServer = randomAssignment(cluster, region, servers);
1440        assignments.get(randomServer).add(region);
1441        numRandomAssignments++;
1442        if (oldServerName != null) {
1443          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1444        }
1445      } else if (localServers.size() == 1) {
1446        // the usual case - one new server on same host
1447        ServerName target = localServers.get(0);
1448        assignments.get(target).add(region);
1449        cluster.doAssignRegion(region, target);
1450        numRetainedAssigments++;
1451      } else {
1452        // multiple new servers in the cluster on this same host
1453        if (localServers.contains(oldServerName)) {
1454          assignments.get(oldServerName).add(region);
1455          cluster.doAssignRegion(region, oldServerName);
1456        } else {
1457          ServerName target = null;
1458          for (ServerName tmp: localServers) {
1459            if (tmp.getPort() == oldServerName.getPort()) {
1460              target = tmp;
1461              break;
1462            }
1463          }
1464          if (target == null) {
1465            target = randomAssignment(cluster, region, localServers);
1466          }
1467          assignments.get(target).add(region);
1468        }
1469        numRetainedAssigments++;
1470      }
1471    }
1472
1473    String randomAssignMsg = "";
1474    if (numRandomAssignments > 0) {
1475      randomAssignMsg =
1476          numRandomAssignments + " regions were assigned "
1477              + "to random hosts, since the old hosts for these regions are no "
1478              + "longer present in the cluster. These hosts were:\n  "
1479              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1480    }
1481
1482    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1483        + " retained the pre-restart assignment. " + randomAssignMsg);
1484    return assignments;
1485  }
1486
1487  @Override
1488  public void initialize() throws HBaseIOException{
1489  }
1490
1491  @Override
1492  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1493  }
1494
1495  @Override
1496  public void regionOffline(RegionInfo regionInfo) {
1497  }
1498
1499  @Override
1500  public boolean isStopped() {
1501    return stopped;
1502  }
1503
1504  @Override
1505  public void stop(String why) {
1506    LOG.info("Load Balancer stop requested: "+why);
1507    stopped = true;
1508  }
1509
1510  /**
1511   * Used to assign a single region to a random server.
1512   */
1513  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1514      List<ServerName> servers) {
1515    int numServers = servers.size(); // servers is not null, numServers > 1
1516    ServerName sn = null;
1517    final int maxIterations = numServers * 4;
1518    int iterations = 0;
1519
1520    do {
1521      int i = RANDOM.nextInt(numServers);
1522      sn = servers.get(i);
1523    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1524        && iterations++ < maxIterations);
1525    cluster.doAssignRegion(regionInfo, sn);
1526    return sn;
1527  }
1528
1529  /**
1530   * Round robin a list of regions to a list of servers
1531   */
1532  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1533      List<RegionInfo> unassignedRegions, List<ServerName> servers,
1534      Map<ServerName, List<RegionInfo>> assignments) {
1535
1536    int numServers = servers.size();
1537    int numRegions = regions.size();
1538    int max = (int) Math.ceil((float) numRegions / numServers);
1539    int serverIdx = 0;
1540    if (numServers > 1) {
1541      serverIdx = RANDOM.nextInt(numServers);
1542    }
1543    int regionIdx = 0;
1544
1545    for (int j = 0; j < numServers; j++) {
1546      ServerName server = servers.get((j + serverIdx) % numServers);
1547      List<RegionInfo> serverRegions = new ArrayList<>(max);
1548      for (int i = regionIdx; i < numRegions; i += numServers) {
1549        RegionInfo region = regions.get(i % numRegions);
1550        if (cluster.wouldLowerAvailability(region, server)) {
1551          unassignedRegions.add(region);
1552        } else {
1553          serverRegions.add(region);
1554          cluster.doAssignRegion(region, server);
1555        }
1556      }
1557      assignments.put(server, serverRegions);
1558      regionIdx++;
1559    }
1560  }
1561
1562  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1563    Collection<RegionInfo> regions) {
1564    if (this.services != null && this.services.getAssignmentManager() != null) {
1565      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1566    } else {
1567      return new HashMap<>();
1568    }
1569  }
1570
1571  @Override
1572  public void onConfigurationChange(Configuration conf) {
1573  }
1574}