001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.NavigableMap;
032import java.util.Random;
033import java.util.Set;
034import java.util.TreeMap;
035import java.util.function.Predicate;
036import java.util.stream.Collectors;
037
038import org.apache.commons.lang3.NotImplementedException;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ClusterMetrics;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseIOException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HDFSBlocksDistribution;
045import org.apache.hadoop.hbase.ServerMetrics;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.master.LoadBalancer;
051import org.apache.hadoop.hbase.master.MasterServices;
052import org.apache.hadoop.hbase.master.RackManager;
053import org.apache.hadoop.hbase.master.RegionPlan;
054import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
058import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
059import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * The base class for load balancers. It provides the the functions used to by
068 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
069 * in the edge cases. It doesn't provide an implementation of the
070 * actual balancing algorithm.
071 *
072 */
073@InterfaceAudience.Private
074public abstract class BaseLoadBalancer implements LoadBalancer {
075  protected static final int MIN_SERVER_BALANCE = 2;
076  private volatile boolean stopped = false;
077
078  private static final List<RegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0);
079
080  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
081    = load -> load.getRegionMetrics().isEmpty();
082
083  protected RegionLocationFinder regionFinder;
084  protected boolean useRegionFinder;
085
086  private static class DefaultRackManager extends RackManager {
087    @Override
088    public String getRack(ServerName server) {
089      return UNKNOWN_RACK;
090    }
091  }
092
093  /**
094   * The constructor that uses the basic MetricsBalancer
095   */
096  protected BaseLoadBalancer() {
097    metricsBalancer = new MetricsBalancer();
098    createRegionFinder();
099  }
100
101  /**
102   * This Constructor accepts an instance of MetricsBalancer,
103   * which will be used instead of creating a new one
104   */
105  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
106    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
107    createRegionFinder();
108  }
109
110  private void createRegionFinder() {
111    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
112    if (useRegionFinder) {
113      regionFinder = new RegionLocationFinder();
114    }
115  }
116
117  /**
118   * An efficient array based implementation similar to ClusterState for keeping
119   * the status of the cluster in terms of region assignment and distribution.
120   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
121   * hundreds of thousands of hashmap manipulations are very costly, which is why this
122   * class uses mostly indexes and arrays.
123   *
124   * Cluster tracks a list of unassigned regions, region assignments, and the server
125   * topology in terms of server names, hostnames and racks.
126   */
127  protected static class Cluster {
128    ServerName[] servers;
129    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
130    String[] racks;
131    boolean multiServersPerHost = false; // whether or not any host has more than one server
132
133    ArrayList<String> tables;
134    RegionInfo[] regions;
135    Deque<BalancerRegionLoad>[] regionLoads;
136    private RegionLocationFinder regionFinder;
137
138    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
139
140    int[]   serverIndexToHostIndex;      //serverIndex -> host index
141    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
142
143    int[][] regionsPerServer;            //serverIndex -> region list
144    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
145    int[][] regionsPerHost;              //hostIndex -> list of regions
146    int[][] regionsPerRack;              //rackIndex -> region list
147    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
148    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
149    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
150
151    int[][] serversPerHost;              //hostIndex -> list of server indexes
152    int[][] serversPerRack;              //rackIndex -> list of server indexes
153    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
154    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
155    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
156    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
157    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
158    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
159    boolean hasRegionReplicas = false;   //whether there is regions with replicas
160
161    Integer[] serverIndicesSortedByRegionCount;
162    Integer[] serverIndicesSortedByLocality;
163
164    Map<String, Integer> serversToIndex;
165    Map<String, Integer> hostsToIndex;
166    Map<String, Integer> racksToIndex;
167    Map<String, Integer> tablesToIndex;
168    Map<RegionInfo, Integer> regionsToIndex;
169    float[] localityPerServer;
170
171    int numServers;
172    int numHosts;
173    int numRacks;
174    int numTables;
175    int numRegions;
176
177    int numMovedRegions = 0; //num moved regions from the initial configuration
178    Map<ServerName, List<RegionInfo>> clusterState;
179
180    protected final RackManager rackManager;
181    // Maps region -> rackIndex -> locality of region on rack
182    private float[][] rackLocalities;
183    // Maps localityType -> region -> [server|rack]Index with highest locality
184    private int[][] regionsToMostLocalEntities;
185
186    protected Cluster(
187        Map<ServerName, List<RegionInfo>> clusterState,
188        Map<String, Deque<BalancerRegionLoad>> loads,
189        RegionLocationFinder regionFinder,
190        RackManager rackManager) {
191      this(null, clusterState, loads, regionFinder, rackManager);
192    }
193
194    @SuppressWarnings("unchecked")
195    protected Cluster(
196        Collection<RegionInfo> unassignedRegions,
197        Map<ServerName, List<RegionInfo>> clusterState,
198        Map<String, Deque<BalancerRegionLoad>> loads,
199        RegionLocationFinder regionFinder,
200        RackManager rackManager) {
201
202      if (unassignedRegions == null) {
203        unassignedRegions = EMPTY_REGION_LIST;
204      }
205
206      serversToIndex = new HashMap<>();
207      hostsToIndex = new HashMap<>();
208      racksToIndex = new HashMap<>();
209      tablesToIndex = new HashMap<>();
210
211      //TODO: We should get the list of tables from master
212      tables = new ArrayList<>();
213      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
214
215      numRegions = 0;
216
217      List<List<Integer>> serversPerHostList = new ArrayList<>();
218      List<List<Integer>> serversPerRackList = new ArrayList<>();
219      this.clusterState = clusterState;
220      this.regionFinder = regionFinder;
221
222      // Use servername and port as there can be dead servers in this list. We want everything with
223      // a matching hostname and port to have the same index.
224      for (ServerName sn : clusterState.keySet()) {
225        if (sn == null) {
226          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
227              "skipping; unassigned regions?");
228          if (LOG.isTraceEnabled()) {
229            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
230          }
231          continue;
232        }
233        if (serversToIndex.get(sn.getAddress().toString()) == null) {
234          serversToIndex.put(sn.getHostAndPort(), numServers++);
235        }
236        if (!hostsToIndex.containsKey(sn.getHostname())) {
237          hostsToIndex.put(sn.getHostname(), numHosts++);
238          serversPerHostList.add(new ArrayList<>(1));
239        }
240
241        int serverIndex = serversToIndex.get(sn.getHostAndPort());
242        int hostIndex = hostsToIndex.get(sn.getHostname());
243        serversPerHostList.get(hostIndex).add(serverIndex);
244
245        String rack = this.rackManager.getRack(sn);
246        if (!racksToIndex.containsKey(rack)) {
247          racksToIndex.put(rack, numRacks++);
248          serversPerRackList.add(new ArrayList<>());
249        }
250        int rackIndex = racksToIndex.get(rack);
251        serversPerRackList.get(rackIndex).add(serverIndex);
252      }
253
254      // Count how many regions there are.
255      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
256        numRegions += entry.getValue().size();
257      }
258      numRegions += unassignedRegions.size();
259
260      regionsToIndex = new HashMap<>(numRegions);
261      servers = new ServerName[numServers];
262      serversPerHost = new int[numHosts][];
263      serversPerRack = new int[numRacks][];
264      regions = new RegionInfo[numRegions];
265      regionIndexToServerIndex = new int[numRegions];
266      initialRegionIndexToServerIndex = new int[numRegions];
267      regionIndexToTableIndex = new int[numRegions];
268      regionIndexToPrimaryIndex = new int[numRegions];
269      regionLoads = new Deque[numRegions];
270
271      regionLocations = new int[numRegions][];
272      serverIndicesSortedByRegionCount = new Integer[numServers];
273      serverIndicesSortedByLocality = new Integer[numServers];
274      localityPerServer = new float[numServers];
275
276      serverIndexToHostIndex = new int[numServers];
277      serverIndexToRackIndex = new int[numServers];
278      regionsPerServer = new int[numServers][];
279      serverIndexToRegionsOffset = new int[numServers];
280      regionsPerHost = new int[numHosts][];
281      regionsPerRack = new int[numRacks][];
282      primariesOfRegionsPerServer = new int[numServers][];
283      primariesOfRegionsPerHost = new int[numHosts][];
284      primariesOfRegionsPerRack = new int[numRacks][];
285
286      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
287
288      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
289        if (entry.getKey() == null) {
290          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
291          continue;
292        }
293        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
294
295        // keep the servername if this is the first server name for this hostname
296        // or this servername has the newest startcode.
297        if (servers[serverIndex] == null ||
298            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
299          servers[serverIndex] = entry.getKey();
300        }
301
302        if (regionsPerServer[serverIndex] != null) {
303          // there is another server with the same hostAndPort in ClusterState.
304          // allocate the array for the total size
305          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
306        } else {
307          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
308        }
309        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
310        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
311        serverIndicesSortedByLocality[serverIndex] = serverIndex;
312      }
313
314      hosts = new String[numHosts];
315      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
316        hosts[entry.getValue()] = entry.getKey();
317      }
318      racks = new String[numRacks];
319      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
320        racks[entry.getValue()] = entry.getKey();
321      }
322
323      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
324        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
325        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
326
327        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
328        serverIndexToHostIndex[serverIndex] = hostIndex;
329
330        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
331        serverIndexToRackIndex[serverIndex] = rackIndex;
332
333        for (RegionInfo region : entry.getValue()) {
334          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
335          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
336          regionIndex++;
337        }
338        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
339      }
340
341      for (RegionInfo region : unassignedRegions) {
342        registerRegion(region, regionIndex, -1, loads, regionFinder);
343        regionIndex++;
344      }
345
346      for (int i = 0; i < serversPerHostList.size(); i++) {
347        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
348        for (int j = 0; j < serversPerHost[i].length; j++) {
349          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
350        }
351        if (serversPerHost[i].length > 1) {
352          multiServersPerHost = true;
353        }
354      }
355
356      for (int i = 0; i < serversPerRackList.size(); i++) {
357        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
358        for (int j = 0; j < serversPerRack[i].length; j++) {
359          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
360        }
361      }
362
363      numTables = tables.size();
364      numRegionsPerServerPerTable = new int[numServers][numTables];
365
366      for (int i = 0; i < numServers; i++) {
367        for (int j = 0; j < numTables; j++) {
368          numRegionsPerServerPerTable[i][j] = 0;
369        }
370      }
371
372      for (int i=0; i < regionIndexToServerIndex.length; i++) {
373        if (regionIndexToServerIndex[i] >= 0) {
374          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
375        }
376      }
377
378      numMaxRegionsPerTable = new int[numTables];
379      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
380        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
381          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
382            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
383          }
384        }
385      }
386
387      for (int i = 0; i < regions.length; i ++) {
388        RegionInfo info = regions[i];
389        if (RegionReplicaUtil.isDefaultReplica(info)) {
390          regionIndexToPrimaryIndex[i] = i;
391        } else {
392          hasRegionReplicas = true;
393          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
394          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
395        }
396      }
397
398      for (int i = 0; i < regionsPerServer.length; i++) {
399        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
400        for (int j = 0; j < regionsPerServer[i].length; j++) {
401          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
402          primariesOfRegionsPerServer[i][j] = primaryIndex;
403        }
404        // sort the regions by primaries.
405        Arrays.sort(primariesOfRegionsPerServer[i]);
406      }
407
408      // compute regionsPerHost
409      if (multiServersPerHost) {
410        for (int i = 0 ; i < serversPerHost.length; i++) {
411          int numRegionsPerHost = 0;
412          for (int j = 0; j < serversPerHost[i].length; j++) {
413            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
414          }
415          regionsPerHost[i] = new int[numRegionsPerHost];
416          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
417        }
418        for (int i = 0 ; i < serversPerHost.length; i++) {
419          int numRegionPerHostIndex = 0;
420          for (int j = 0; j < serversPerHost[i].length; j++) {
421            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
422              int region = regionsPerServer[serversPerHost[i][j]][k];
423              regionsPerHost[i][numRegionPerHostIndex] = region;
424              int primaryIndex = regionIndexToPrimaryIndex[region];
425              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
426              numRegionPerHostIndex++;
427            }
428          }
429          // sort the regions by primaries.
430          Arrays.sort(primariesOfRegionsPerHost[i]);
431        }
432      }
433
434      // compute regionsPerRack
435      if (numRacks > 1) {
436        for (int i = 0 ; i < serversPerRack.length; i++) {
437          int numRegionsPerRack = 0;
438          for (int j = 0; j < serversPerRack[i].length; j++) {
439            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
440          }
441          regionsPerRack[i] = new int[numRegionsPerRack];
442          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
443        }
444
445        for (int i = 0 ; i < serversPerRack.length; i++) {
446          int numRegionPerRackIndex = 0;
447          for (int j = 0; j < serversPerRack[i].length; j++) {
448            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
449              int region = regionsPerServer[serversPerRack[i][j]][k];
450              regionsPerRack[i][numRegionPerRackIndex] = region;
451              int primaryIndex = regionIndexToPrimaryIndex[region];
452              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
453              numRegionPerRackIndex++;
454            }
455          }
456          // sort the regions by primaries.
457          Arrays.sort(primariesOfRegionsPerRack[i]);
458        }
459      }
460    }
461
462    /** Helper for Cluster constructor to handle a region */
463    private void registerRegion(RegionInfo region, int regionIndex,
464        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
465        RegionLocationFinder regionFinder) {
466      String tableName = region.getTable().getNameAsString();
467      if (!tablesToIndex.containsKey(tableName)) {
468        tables.add(tableName);
469        tablesToIndex.put(tableName, tablesToIndex.size());
470      }
471      int tableIndex = tablesToIndex.get(tableName);
472
473      regionsToIndex.put(region, regionIndex);
474      regions[regionIndex] = region;
475      regionIndexToServerIndex[regionIndex] = serverIndex;
476      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
477      regionIndexToTableIndex[regionIndex] = tableIndex;
478
479      // region load
480      if (loads != null) {
481        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
482        // That could have failed if the RegionLoad is using the other regionName
483        if (rl == null) {
484          // Try getting the region load using encoded name.
485          rl = loads.get(region.getEncodedName());
486        }
487        regionLoads[regionIndex] = rl;
488      }
489
490      if (regionFinder != null) {
491        // region location
492        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
493        regionLocations[regionIndex] = new int[loc.size()];
494        for (int i = 0; i < loc.size(); i++) {
495          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
496              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
497                  : serversToIndex.get(loc.get(i).getHostAndPort()));
498        }
499      }
500    }
501
502    /**
503     * Returns true iff a given server has less regions than the balanced amount
504     */
505    public boolean serverHasTooFewRegions(int server) {
506      int minLoad = this.numRegions / numServers;
507      int numRegions = getNumRegions(server);
508      return numRegions < minLoad;
509    }
510
511    /**
512     * Retrieves and lazily initializes a field storing the locality of
513     * every region/server combination
514     */
515    public float[][] getOrComputeRackLocalities() {
516      if (rackLocalities == null || regionsToMostLocalEntities == null) {
517        computeCachedLocalities();
518      }
519      return rackLocalities;
520    }
521
522    /**
523     * Lazily initializes and retrieves a mapping of region -> server for which region has
524     * the highest the locality
525     */
526    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
527      if (rackLocalities == null || regionsToMostLocalEntities == null) {
528        computeCachedLocalities();
529      }
530      return regionsToMostLocalEntities[type.ordinal()];
531    }
532
533    /**
534     * Looks up locality from cache of localities. Will create cache if it does
535     * not already exist.
536     */
537    public float getOrComputeLocality(int region, int entity, LocalityType type) {
538      switch (type) {
539        case SERVER:
540          return getLocalityOfRegion(region, entity);
541        case RACK:
542          return getOrComputeRackLocalities()[region][entity];
543        default:
544          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
545      }
546    }
547
548    /**
549     * Returns locality weighted by region size in MB. Will create locality cache
550     * if it does not already exist.
551     */
552    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
553      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
554    }
555
556    /**
557     * Returns the size in MB from the most recent RegionLoad for region
558     */
559    public int getRegionSizeMB(int region) {
560      Deque<BalancerRegionLoad> load = regionLoads[region];
561      // This means regions have no actual data on disk
562      if (load == null) {
563        return 0;
564      }
565      return regionLoads[region].getLast().getStorefileSizeMB();
566    }
567
568    /**
569     * Computes and caches the locality for each region/rack combinations,
570     * as well as storing a mapping of region -> server and region -> rack such that server
571     * and rack have the highest locality for region
572     */
573    private void computeCachedLocalities() {
574      rackLocalities = new float[numRegions][numRacks];
575      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
576
577      // Compute localities and find most local server per region
578      for (int region = 0; region < numRegions; region++) {
579        int serverWithBestLocality = 0;
580        float bestLocalityForRegion = 0;
581        for (int server = 0; server < numServers; server++) {
582          // Aggregate per-rack locality
583          float locality = getLocalityOfRegion(region, server);
584          int rack = serverIndexToRackIndex[server];
585          int numServersInRack = serversPerRack[rack].length;
586          rackLocalities[region][rack] += locality / numServersInRack;
587
588          if (locality > bestLocalityForRegion) {
589            serverWithBestLocality = server;
590            bestLocalityForRegion = locality;
591          }
592        }
593        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
594
595        // Find most local rack per region
596        int rackWithBestLocality = 0;
597        float bestRackLocalityForRegion = 0.0f;
598        for (int rack = 0; rack < numRacks; rack++) {
599          float rackLocality = rackLocalities[region][rack];
600          if (rackLocality > bestRackLocalityForRegion) {
601            bestRackLocalityForRegion = rackLocality;
602            rackWithBestLocality = rack;
603          }
604        }
605        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
606      }
607
608    }
609
610    /**
611     * Maps region index to rack index
612     */
613    public int getRackForRegion(int region) {
614      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
615    }
616
617    enum LocalityType {
618      SERVER,
619      RACK
620    }
621
622    /** An action to move or swap a region */
623    public static class Action {
624      public enum Type {
625        ASSIGN_REGION,
626        MOVE_REGION,
627        SWAP_REGIONS,
628        NULL,
629      }
630
631      public Type type;
632      public Action (Type type) {this.type = type;}
633      /** Returns an Action which would undo this action */
634      public Action undoAction() { return this; }
635      @Override
636      public String toString() { return type + ":";}
637    }
638
639    public static class AssignRegionAction extends Action {
640      public int region;
641      public int server;
642      public AssignRegionAction(int region, int server) {
643        super(Type.ASSIGN_REGION);
644        this.region = region;
645        this.server = server;
646      }
647      @Override
648      public Action undoAction() {
649        // TODO implement this. This action is not being used by the StochasticLB for now
650        // in case it uses it, we should implement this function.
651        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
652      }
653      @Override
654      public String toString() {
655        return type + ": " + region + ":" + server;
656      }
657    }
658
659    public static class MoveRegionAction extends Action {
660      public int region;
661      public int fromServer;
662      public int toServer;
663
664      public MoveRegionAction(int region, int fromServer, int toServer) {
665        super(Type.MOVE_REGION);
666        this.fromServer = fromServer;
667        this.region = region;
668        this.toServer = toServer;
669      }
670      @Override
671      public Action undoAction() {
672        return new MoveRegionAction (region, toServer, fromServer);
673      }
674      @Override
675      public String toString() {
676        return type + ": " + region + ":" + fromServer + " -> " + toServer;
677      }
678    }
679
680    public static class SwapRegionsAction extends Action {
681      public int fromServer;
682      public int fromRegion;
683      public int toServer;
684      public int toRegion;
685      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
686        super(Type.SWAP_REGIONS);
687        this.fromServer = fromServer;
688        this.fromRegion = fromRegion;
689        this.toServer = toServer;
690        this.toRegion = toRegion;
691      }
692      @Override
693      public Action undoAction() {
694        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
695      }
696      @Override
697      public String toString() {
698        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
699      }
700    }
701
702    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
703        justification="Mistake. Too disruptive to change now")
704    public static final Action NullAction = new Action(Type.NULL);
705
706    public void doAction(Action action) {
707      switch (action.type) {
708      case NULL: break;
709      case ASSIGN_REGION:
710        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
711        assert action instanceof AssignRegionAction: action.getClass();
712        AssignRegionAction ar = (AssignRegionAction) action;
713        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
714        regionMoved(ar.region, -1, ar.server);
715        break;
716      case MOVE_REGION:
717        assert action instanceof MoveRegionAction: action.getClass();
718        MoveRegionAction mra = (MoveRegionAction) action;
719        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
720        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
721        regionMoved(mra.region, mra.fromServer, mra.toServer);
722        break;
723      case SWAP_REGIONS:
724        assert action instanceof SwapRegionsAction: action.getClass();
725        SwapRegionsAction a = (SwapRegionsAction) action;
726        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
727        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
728        regionMoved(a.fromRegion, a.fromServer, a.toServer);
729        regionMoved(a.toRegion, a.toServer, a.fromServer);
730        break;
731      default:
732        throw new RuntimeException("Uknown action:" + action.type);
733      }
734    }
735
736    /**
737     * Return true if the placement of region on server would lower the availability
738     * of the region in question
739     * @return true or false
740     */
741    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
742      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
743        return false; // safeguard against race between cluster.servers and servers from LB method args
744      }
745      int server = serversToIndex.get(serverName.getHostAndPort());
746      int region = regionsToIndex.get(regionInfo);
747
748      int primary = regionIndexToPrimaryIndex[region];
749      // there is a subset relation for server < host < rack
750      // check server first
751
752      if (contains(primariesOfRegionsPerServer[server], primary)) {
753        // check for whether there are other servers that we can place this region
754        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
755          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
756            return true; // meaning there is a better server
757          }
758        }
759        return false; // there is not a better server to place this
760      }
761
762      // check host
763      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
764        int host = serverIndexToHostIndex[server];
765        if (contains(primariesOfRegionsPerHost[host], primary)) {
766          // check for whether there are other hosts that we can place this region
767          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
768            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
769              return true; // meaning there is a better host
770            }
771          }
772          return false; // there is not a better host to place this
773        }
774      }
775
776      // check rack
777      if (numRacks > 1) {
778        int rack = serverIndexToRackIndex[server];
779        if (contains(primariesOfRegionsPerRack[rack], primary)) {
780          // check for whether there are other racks that we can place this region
781          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
782            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
783              return true; // meaning there is a better rack
784            }
785          }
786          return false; // there is not a better rack to place this
787        }
788      }
789      return false;
790    }
791
792    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
793      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
794        return;
795      }
796      int server = serversToIndex.get(serverName.getHostAndPort());
797      int region = regionsToIndex.get(regionInfo);
798      doAction(new AssignRegionAction(region, server));
799    }
800
801    void regionMoved(int region, int oldServer, int newServer) {
802      regionIndexToServerIndex[region] = newServer;
803      if (initialRegionIndexToServerIndex[region] == newServer) {
804        numMovedRegions--; //region moved back to original location
805      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
806        numMovedRegions++; //region moved from original location
807      }
808      int tableIndex = regionIndexToTableIndex[region];
809      if (oldServer >= 0) {
810        numRegionsPerServerPerTable[oldServer][tableIndex]--;
811      }
812      numRegionsPerServerPerTable[newServer][tableIndex]++;
813
814      //check whether this caused maxRegionsPerTable in the new Server to be updated
815      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
816        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
817      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
818          == numMaxRegionsPerTable[tableIndex]) {
819        //recompute maxRegionsPerTable since the previous value was coming from the old server
820        numMaxRegionsPerTable[tableIndex] = 0;
821        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
822          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
823            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
824          }
825        }
826      }
827
828      // update for servers
829      int primary = regionIndexToPrimaryIndex[region];
830      if (oldServer >= 0) {
831        primariesOfRegionsPerServer[oldServer] = removeRegion(
832          primariesOfRegionsPerServer[oldServer], primary);
833      }
834      primariesOfRegionsPerServer[newServer] = addRegionSorted(
835        primariesOfRegionsPerServer[newServer], primary);
836
837      // update for hosts
838      if (multiServersPerHost) {
839        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
840        int newHost = serverIndexToHostIndex[newServer];
841        if (newHost != oldHost) {
842          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
843          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
844          if (oldHost >= 0) {
845            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
846            primariesOfRegionsPerHost[oldHost] = removeRegion(
847              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
848          }
849        }
850      }
851
852      // update for racks
853      if (numRacks > 1) {
854        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
855        int newRack = serverIndexToRackIndex[newServer];
856        if (newRack != oldRack) {
857          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
858          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
859          if (oldRack >= 0) {
860            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
861            primariesOfRegionsPerRack[oldRack] = removeRegion(
862              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
863          }
864        }
865      }
866    }
867
868    int[] removeRegion(int[] regions, int regionIndex) {
869      //TODO: this maybe costly. Consider using linked lists
870      int[] newRegions = new int[regions.length - 1];
871      int i = 0;
872      for (i = 0; i < regions.length; i++) {
873        if (regions[i] == regionIndex) {
874          break;
875        }
876        newRegions[i] = regions[i];
877      }
878      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
879      return newRegions;
880    }
881
882    int[] addRegion(int[] regions, int regionIndex) {
883      int[] newRegions = new int[regions.length + 1];
884      System.arraycopy(regions, 0, newRegions, 0, regions.length);
885      newRegions[newRegions.length - 1] = regionIndex;
886      return newRegions;
887    }
888
889    int[] addRegionSorted(int[] regions, int regionIndex) {
890      int[] newRegions = new int[regions.length + 1];
891      int i = 0;
892      for (i = 0; i < regions.length; i++) { // find the index to insert
893        if (regions[i] > regionIndex) {
894          break;
895        }
896      }
897      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
898      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
899      newRegions[i] = regionIndex;
900
901      return newRegions;
902    }
903
904    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
905      int i = 0;
906      for (i = 0; i < regions.length; i++) {
907        if (regions[i] == regionIndex) {
908          regions[i] = newRegionIndex;
909          break;
910        }
911      }
912      return regions;
913    }
914
915    void sortServersByRegionCount() {
916      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
917    }
918
919    int getNumRegions(int server) {
920      return regionsPerServer[server].length;
921    }
922
923    boolean contains(int[] arr, int val) {
924      return Arrays.binarySearch(arr, val) >= 0;
925    }
926
927    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
928
929    int getLowestLocalityRegionOnServer(int serverIndex) {
930      if (regionFinder != null) {
931        float lowestLocality = 1.0f;
932        int lowestLocalityRegionIndex = -1;
933        if (regionsPerServer[serverIndex].length == 0) {
934          // No regions on that region server
935          return -1;
936        }
937        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
938          int regionIndex = regionsPerServer[serverIndex][j];
939          HDFSBlocksDistribution distribution = regionFinder
940              .getBlockDistribution(regions[regionIndex]);
941          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
942          // skip empty region
943          if (distribution.getUniqueBlocksTotalWeight() == 0) {
944            continue;
945          }
946          if (locality < lowestLocality) {
947            lowestLocality = locality;
948            lowestLocalityRegionIndex = j;
949          }
950        }
951        if (lowestLocalityRegionIndex == -1) {
952          return -1;
953        }
954        if (LOG.isTraceEnabled()) {
955          LOG.trace("Lowest locality region is "
956              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
957                  .getRegionNameAsString() + " with locality " + lowestLocality
958              + " and its region server contains " + regionsPerServer[serverIndex].length
959              + " regions");
960        }
961        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
962      } else {
963        return -1;
964      }
965    }
966
967    float getLocalityOfRegion(int region, int server) {
968      if (regionFinder != null) {
969        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
970        return distribution.getBlockLocalityIndex(servers[server].getHostname());
971      } else {
972        return 0f;
973      }
974    }
975
976    @VisibleForTesting
977    protected void setNumRegions(int numRegions) {
978      this.numRegions = numRegions;
979    }
980
981    @VisibleForTesting
982    protected void setNumMovedRegions(int numMovedRegions) {
983      this.numMovedRegions = numMovedRegions;
984    }
985
986    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
987        justification="Not important but should be fixed")
988    @Override
989    public String toString() {
990      StringBuilder desc = new StringBuilder("Cluster={servers=[");
991      for(ServerName sn:servers) {
992        desc.append(sn.getHostAndPort()).append(", ");
993      }
994      desc.append("], serverIndicesSortedByRegionCount=")
995          .append(Arrays.toString(serverIndicesSortedByRegionCount))
996          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
997
998      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
999          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1000          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1001          .append(numMovedRegions).append('}');
1002      return desc.toString();
1003    }
1004  }
1005
1006  // slop for regions
1007  protected float slop;
1008  // overallSlop to control simpleLoadBalancer's cluster level threshold
1009  protected float overallSlop;
1010  protected Configuration config = HBaseConfiguration.create();
1011  protected RackManager rackManager;
1012  private static final Random RANDOM = new Random(System.currentTimeMillis());
1013  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1014  protected MetricsBalancer metricsBalancer = null;
1015  protected ClusterMetrics clusterStatus = null;
1016  protected ServerName masterServerName;
1017  protected MasterServices services;
1018  protected boolean onlySystemTablesOnMaster;
1019  protected boolean maintenanceMode;
1020
1021  @Override
1022  public void setConf(Configuration conf) {
1023    this.config = conf;
1024    setSlop(conf);
1025    if (slop < 0) slop = 0;
1026    else if (slop > 1) slop = 1;
1027
1028    if (overallSlop < 0) overallSlop = 0;
1029    else if (overallSlop > 1) overallSlop = 1;
1030
1031    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1032
1033    this.rackManager = new RackManager(getConf());
1034    if (useRegionFinder) {
1035      regionFinder.setConf(conf);
1036    }
1037    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1038    LOG.info("slop={}, systemTablesOnMaster={}",
1039        this.slop, this.onlySystemTablesOnMaster);
1040  }
1041
1042  protected void setSlop(Configuration conf) {
1043    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1044    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1045  }
1046
1047  /**
1048   * Check if a region belongs to some system table.
1049   * If so, the primary replica may be expected to be put on the master regionserver.
1050   */
1051  public boolean shouldBeOnMaster(RegionInfo region) {
1052    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1053        && region.getTable().isSystemTable();
1054  }
1055
1056  /**
1057   * Balance the regions that should be on master regionserver.
1058   */
1059  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1060    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1061    List<RegionPlan> plans = null;
1062    List<RegionInfo> regions = clusterMap.get(masterServerName);
1063    if (regions != null) {
1064      Iterator<ServerName> keyIt = null;
1065      for (RegionInfo region: regions) {
1066        if (shouldBeOnMaster(region)) continue;
1067
1068        // Find a non-master regionserver to host the region
1069        if (keyIt == null || !keyIt.hasNext()) {
1070          keyIt = clusterMap.keySet().iterator();
1071        }
1072        ServerName dest = keyIt.next();
1073        if (masterServerName.equals(dest)) {
1074          if (!keyIt.hasNext()) {
1075            keyIt = clusterMap.keySet().iterator();
1076          }
1077          dest = keyIt.next();
1078        }
1079
1080        // Move this region away from the master regionserver
1081        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1082        if (plans == null) {
1083          plans = new ArrayList<>();
1084        }
1085        plans.add(plan);
1086      }
1087    }
1088    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1089      if (masterServerName.equals(server.getKey())) continue;
1090      for (RegionInfo region: server.getValue()) {
1091        if (!shouldBeOnMaster(region)) continue;
1092
1093        // Move this region to the master regionserver
1094        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1095        if (plans == null) {
1096          plans = new ArrayList<>();
1097        }
1098        plans.add(plan);
1099      }
1100    }
1101    return plans;
1102  }
1103
1104  /**
1105   * If master is configured to carry system tables only, in here is
1106   * where we figure what to assign it.
1107   */
1108  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1109      Collection<RegionInfo> regions, List<ServerName> servers) {
1110    if (servers == null || regions == null || regions.isEmpty()) {
1111      return null;
1112    }
1113    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1114    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1115      if (masterServerName != null && servers.contains(masterServerName)) {
1116        assignments.put(masterServerName, new ArrayList<>());
1117        for (RegionInfo region : regions) {
1118          if (shouldBeOnMaster(region)) {
1119            assignments.get(masterServerName).add(region);
1120          }
1121        }
1122      }
1123    }
1124    return assignments;
1125  }
1126
1127  @Override
1128  public Configuration getConf() {
1129    return this.config;
1130  }
1131
1132  @Override
1133  public synchronized void setClusterMetrics(ClusterMetrics st) {
1134    this.clusterStatus = st;
1135    if (useRegionFinder) {
1136      regionFinder.setClusterMetrics(st);
1137    }
1138  }
1139
1140  @Override
1141  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
1142
1143  }
1144
1145  @Override
1146  public void setMasterServices(MasterServices masterServices) {
1147    masterServerName = masterServices.getServerName();
1148    this.services = masterServices;
1149    if (useRegionFinder) {
1150      this.regionFinder.setServices(masterServices);
1151    }
1152    if (this.services.isInMaintenanceMode()) {
1153      this.maintenanceMode = true;
1154    }
1155  }
1156
1157  @Override
1158  public void postMasterStartupInitialize() {
1159    if (services != null && regionFinder != null) {
1160      try {
1161        Set<RegionInfo> regions =
1162            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1163        regionFinder.refreshAndWait(regions);
1164      } catch (Exception e) {
1165        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1166      }
1167    }
1168  }
1169
1170  public void setRackManager(RackManager rackManager) {
1171    this.rackManager = rackManager;
1172  }
1173
1174  protected boolean needsBalance(Cluster c) {
1175    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1176    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1177      if (LOG.isDebugEnabled()) {
1178        LOG.debug("Not running balancer because only " + cs.getNumServers()
1179            + " active regionserver(s)");
1180      }
1181      return false;
1182    }
1183    if(areSomeRegionReplicasColocated(c)) return true;
1184    // Check if we even need to do any load balancing
1185    // HBASE-3681 check sloppiness first
1186    float average = cs.getLoadAverage(); // for logging
1187    int floor = (int) Math.floor(average * (1 - slop));
1188    int ceiling = (int) Math.ceil(average * (1 + slop));
1189    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1190      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1191      if (LOG.isTraceEnabled()) {
1192        // If nothing to balance, then don't say anything unless trace-level logging.
1193        LOG.trace("Skipping load balancing because balanced cluster; " +
1194          "servers=" + cs.getNumServers() +
1195          " regions=" + cs.getNumRegions() + " average=" + average +
1196          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1197          " leastloaded=" + serversByLoad.firstKey().getLoad());
1198      }
1199      return false;
1200    }
1201    return true;
1202  }
1203
1204  /**
1205   * Subclasses should implement this to return true if the cluster has nodes that hosts
1206   * multiple replicas for the same region, or, if there are multiple racks and the same
1207   * rack hosts replicas of the same region
1208   * @param c Cluster information
1209   * @return whether region replicas are currently co-located
1210   */
1211  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1212    return false;
1213  }
1214
1215  /**
1216   * Generates a bulk assignment plan to be used on cluster startup using a
1217   * simple round-robin assignment.
1218   * <p>
1219   * Takes a list of all the regions and all the servers in the cluster and
1220   * returns a map of each server to the regions that it should be assigned.
1221   * <p>
1222   * Currently implemented as a round-robin assignment. Same invariant as load
1223   * balancing, all servers holding floor(avg) or ceiling(avg).
1224   *
1225   * TODO: Use block locations from HDFS to place regions with their blocks
1226   *
1227   * @param regions all regions
1228   * @param servers all servers
1229   * @return map of server to the regions it should take, or null if no
1230   *         assignment is possible (ie. no regions or no servers)
1231   */
1232  @Override
1233  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1234      List<ServerName> servers) throws HBaseIOException {
1235    metricsBalancer.incrMiscInvocations();
1236    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1237    if (assignments != null && !assignments.isEmpty()) {
1238      servers = new ArrayList<>(servers);
1239      // Guarantee not to put other regions on master
1240      servers.remove(masterServerName);
1241      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1242      if (!masterRegions.isEmpty()) {
1243        regions = new ArrayList<>(regions);
1244        regions.removeAll(masterRegions);
1245      }
1246    }
1247    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1248      return assignments;
1249    }
1250
1251    int numServers = servers == null ? 0 : servers.size();
1252    if (numServers == 0) {
1253      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1254      return null;
1255    }
1256
1257    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1258    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1259    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1260    // and balanced. This should also run fast with fewer number of iterations.
1261
1262    if (numServers == 1) { // Only one server, nothing fancy we can do here
1263      ServerName server = servers.get(0);
1264      assignments.put(server, new ArrayList<>(regions));
1265      return assignments;
1266    }
1267
1268    Cluster cluster = createCluster(servers, regions, false);
1269    List<RegionInfo> unassignedRegions = new ArrayList<>();
1270
1271    roundRobinAssignment(cluster, regions, unassignedRegions,
1272      servers, assignments);
1273
1274    List<RegionInfo> lastFewRegions = new ArrayList<>();
1275    // assign the remaining by going through the list and try to assign to servers one-by-one
1276    int serverIdx = RANDOM.nextInt(numServers);
1277    OUTER : for (RegionInfo region : unassignedRegions) {
1278      boolean assigned = false;
1279      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1280        ServerName serverName = servers.get((j + serverIdx) % numServers);
1281        if (!cluster.wouldLowerAvailability(region, serverName)) {
1282          List<RegionInfo> serverRegions =
1283              assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
1284          if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) {
1285            // if the region is not a default replica
1286            // check if the assignments map has the other replica region on this server
1287            for (RegionInfo hri : serverRegions) {
1288              if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) {
1289                if (LOG.isTraceEnabled()) {
1290                  LOG.trace("Skipping the server, " + serverName
1291                      + " , got the same server for the region " + region);
1292                }
1293                // do not allow this case. The unassignedRegions we got because the
1294                // replica region in this list was not assigned because of lower availablity issue.
1295                // So when we assign here we should ensure that as far as possible the server being
1296                // selected does not have the server where the replica region was not assigned.
1297                continue INNER; // continue the inner loop, ie go to the next server
1298              }
1299            }
1300          }
1301          serverRegions.add(region);
1302          cluster.doAssignRegion(region, serverName);
1303          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1304          assigned = true;
1305          break;
1306        }
1307      }
1308      if (!assigned) {
1309        lastFewRegions.add(region);
1310      }
1311    }
1312    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1313    // make it optimal later. we can end up with this if numReplicas > numServers.
1314    for (RegionInfo region : lastFewRegions) {
1315      int i = RANDOM.nextInt(numServers);
1316      ServerName server = servers.get(i);
1317      List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>());
1318      serverRegions.add(region);
1319      cluster.doAssignRegion(region, server);
1320    }
1321    return assignments;
1322  }
1323
1324  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
1325      boolean hasRegionReplica) {
1326    // Get the snapshot of the current assignments for the regions in question, and then create
1327    // a cluster out of it. Note that we might have replicas already assigned to some servers
1328    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1329    // replicas of the regions that are passed (for performance).
1330    Map<ServerName, List<RegionInfo>> clusterState = null;
1331    if (!hasRegionReplica) {
1332      clusterState = getRegionAssignmentsByServer(regions);
1333    } else {
1334      // for the case where we have region replica it is better we get the entire cluster's snapshot
1335      clusterState = getRegionAssignmentsByServer(null);
1336    }
1337
1338    for (ServerName server : servers) {
1339      if (!clusterState.containsKey(server)) {
1340        clusterState.put(server, EMPTY_REGION_LIST);
1341      }
1342    }
1343    return new Cluster(regions, clusterState, null, this.regionFinder,
1344        rackManager);
1345  }
1346
1347  private List<ServerName> findIdleServers(List<ServerName> servers) {
1348    return this.services.getServerManager()
1349            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1350  }
1351
1352  /**
1353   * Used to assign a single region to a random server.
1354   */
1355  @Override
1356  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1357      throws HBaseIOException {
1358    metricsBalancer.incrMiscInvocations();
1359    if (servers != null && servers.contains(masterServerName)) {
1360      if (shouldBeOnMaster(regionInfo)) {
1361        return masterServerName;
1362      }
1363      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1364        // Guarantee we do not put any regions on master
1365        servers = new ArrayList<>(servers);
1366        servers.remove(masterServerName);
1367      }
1368    }
1369
1370    int numServers = servers == null ? 0 : servers.size();
1371    if (numServers == 0) {
1372      LOG.warn("Wanted to retain assignment but no servers to assign to");
1373      return null;
1374    }
1375    if (numServers == 1) { // Only one server, nothing fancy we can do here
1376      return servers.get(0);
1377    }
1378    List<ServerName> idleServers = findIdleServers(servers);
1379    if (idleServers.size() == 1) {
1380      return idleServers.get(0);
1381    }
1382    final List<ServerName> finalServers = idleServers.isEmpty() ?
1383            servers : idleServers;
1384    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1385    Cluster cluster = createCluster(finalServers, regions, false);
1386    return randomAssignment(cluster, regionInfo, finalServers);
1387  }
1388
1389  /**
1390   * Generates a bulk assignment startup plan, attempting to reuse the existing
1391   * assignment information from META, but adjusting for the specified list of
1392   * available/online servers available for assignment.
1393   * <p>
1394   * Takes a map of all regions to their existing assignment from META. Also
1395   * takes a list of online servers for regions to be assigned to. Attempts to
1396   * retain all assignment, so in some instances initial assignment will not be
1397   * completely balanced.
1398   * <p>
1399   * Any leftover regions without an existing server to be assigned to will be
1400   * assigned randomly to available servers.
1401   *
1402   * @param regions regions and existing assignment from meta
1403   * @param servers available servers
1404   * @return map of servers and regions to be assigned to them
1405   */
1406  @Override
1407  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1408      List<ServerName> servers) throws HBaseIOException {
1409    // Update metrics
1410    metricsBalancer.incrMiscInvocations();
1411    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1412    if (assignments != null && !assignments.isEmpty()) {
1413      servers = new ArrayList<>(servers);
1414      // Guarantee not to put other regions on master
1415      servers.remove(masterServerName);
1416      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1417      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1418          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1419    }
1420    if (this.maintenanceMode || regions.isEmpty()) {
1421      return assignments;
1422    }
1423
1424    int numServers = servers == null ? 0 : servers.size();
1425    if (numServers == 0) {
1426      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1427      return null;
1428    }
1429    if (numServers == 1) { // Only one server, nothing fancy we can do here
1430      ServerName server = servers.get(0);
1431      assignments.put(server, new ArrayList<>(regions.keySet()));
1432      return assignments;
1433    }
1434
1435    // Group all of the old assignments by their hostname.
1436    // We can't group directly by ServerName since the servers all have
1437    // new start-codes.
1438
1439    // Group the servers by their hostname. It's possible we have multiple
1440    // servers on the same host on different ports.
1441    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1442    for (ServerName server : servers) {
1443      assignments.put(server, new ArrayList<>());
1444      serversByHostname.put(server.getHostnameLowerCase(), server);
1445    }
1446
1447    // Collection of the hostnames that used to have regions
1448    // assigned, but for which we no longer have any RS running
1449    // after the cluster restart.
1450    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1451
1452    // If the old servers aren't present, lets assign those regions later.
1453    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1454
1455    int numRandomAssignments = 0;
1456    int numRetainedAssigments = 0;
1457    boolean hasRegionReplica = false;
1458    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1459      RegionInfo region = entry.getKey();
1460      ServerName oldServerName = entry.getValue();
1461      // In the current set of regions even if one has region replica let us go with
1462      // getting the entire snapshot
1463      if (this.services != null) { // for tests
1464        AssignmentManager am = this.services.getAssignmentManager();
1465        if (am != null) {
1466          RegionStates rss = am.getRegionStates();
1467          if (!hasRegionReplica && rss.isReplicaAvailableForRegion(region)) {
1468            hasRegionReplica = true;
1469          }
1470        }
1471      }
1472      List<ServerName> localServers = new ArrayList<>();
1473      if (oldServerName != null) {
1474        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1475      }
1476      if (localServers.isEmpty()) {
1477        // No servers on the new cluster match up with this hostname, assign randomly, later.
1478        randomAssignRegions.add(region);
1479        if (oldServerName != null) {
1480          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1481        }
1482      } else if (localServers.size() == 1) {
1483        // the usual case - one new server on same host
1484        ServerName target = localServers.get(0);
1485        assignments.get(target).add(region);
1486        numRetainedAssigments++;
1487      } else {
1488        // multiple new servers in the cluster on this same host
1489        if (localServers.contains(oldServerName)) {
1490          assignments.get(oldServerName).add(region);
1491          numRetainedAssigments++;
1492        } else {
1493          ServerName target = null;
1494          for (ServerName tmp : localServers) {
1495            if (tmp.getPort() == oldServerName.getPort()) {
1496              target = tmp;
1497              assignments.get(tmp).add(region);
1498              numRetainedAssigments++;
1499              break;
1500            }
1501          }
1502          if (target == null) {
1503            randomAssignRegions.add(region);
1504          }
1505        }
1506      }
1507    }
1508
1509    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1510    if (randomAssignRegions.size() > 0) {
1511      Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
1512      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1513        ServerName sn = entry.getKey();
1514        for (RegionInfo region : entry.getValue()) {
1515          cluster.doAssignRegion(region, sn);
1516        }
1517      }
1518      for (RegionInfo region : randomAssignRegions) {
1519        ServerName target = randomAssignment(cluster, region, servers);
1520        assignments.get(target).add(region);
1521        numRandomAssignments++;
1522      }
1523    }
1524
1525    String randomAssignMsg = "";
1526    if (numRandomAssignments > 0) {
1527      randomAssignMsg =
1528          numRandomAssignments + " regions were assigned "
1529              + "to random hosts, since the old hosts for these regions are no "
1530              + "longer present in the cluster. These hosts were:\n  "
1531              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1532    }
1533
1534    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1535        + " retained the pre-restart assignment. " + randomAssignMsg);
1536    return assignments;
1537  }
1538
1539  @Override
1540  public void initialize() throws HBaseIOException{
1541  }
1542
1543  @Override
1544  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1545  }
1546
1547  @Override
1548  public void regionOffline(RegionInfo regionInfo) {
1549  }
1550
1551  @Override
1552  public boolean isStopped() {
1553    return stopped;
1554  }
1555
1556  @Override
1557  public void stop(String why) {
1558    LOG.info("Load Balancer stop requested: "+why);
1559    stopped = true;
1560  }
1561
1562  /**
1563  * Updates the balancer status tag reported to JMX
1564  */
1565  public void updateBalancerStatus(boolean status) {
1566    metricsBalancer.balancerStatus(status);
1567  }
1568
1569  /**
1570   * Used to assign a single region to a random server.
1571   */
1572  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1573      List<ServerName> servers) {
1574    int numServers = servers.size(); // servers is not null, numServers > 1
1575    ServerName sn = null;
1576    final int maxIterations = numServers * 4;
1577    int iterations = 0;
1578    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1579    do {
1580      int i = RANDOM.nextInt(numServers);
1581      sn = servers.get(i);
1582      if (!usedSNs.contains(sn)) {
1583        usedSNs.add(sn);
1584      }
1585    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1586        && iterations++ < maxIterations);
1587    if (iterations >= maxIterations) {
1588      // We have reached the max. Means the servers that we collected is still lowering the
1589      // availability
1590      for (ServerName unusedServer : servers) {
1591        if (!usedSNs.contains(unusedServer)) {
1592          // check if any other unused server is there for us to use.
1593          // If so use it. Else we have not other go but to go with one of them
1594          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1595            sn = unusedServer;
1596            break;
1597          }
1598        }
1599      }
1600    }
1601    cluster.doAssignRegion(regionInfo, sn);
1602    return sn;
1603  }
1604
1605  /**
1606   * Round robin a list of regions to a list of servers
1607   */
1608  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1609      List<RegionInfo> unassignedRegions, List<ServerName> servers,
1610      Map<ServerName, List<RegionInfo>> assignments) {
1611
1612    int numServers = servers.size();
1613    int numRegions = regions.size();
1614    int max = (int) Math.ceil((float) numRegions / numServers);
1615    int serverIdx = 0;
1616    if (numServers > 1) {
1617      serverIdx = RANDOM.nextInt(numServers);
1618    }
1619    int regionIdx = 0;
1620
1621    for (int j = 0; j < numServers; j++) {
1622      ServerName server = servers.get((j + serverIdx) % numServers);
1623      List<RegionInfo> serverRegions = new ArrayList<>(max);
1624      for (int i = regionIdx; i < numRegions; i += numServers) {
1625        RegionInfo region = regions.get(i % numRegions);
1626        if (cluster.wouldLowerAvailability(region, server)) {
1627          unassignedRegions.add(region);
1628        } else {
1629          serverRegions.add(region);
1630          cluster.doAssignRegion(region, server);
1631        }
1632      }
1633      assignments.put(server, serverRegions);
1634      regionIdx++;
1635    }
1636  }
1637
1638  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1639    Collection<RegionInfo> regions) {
1640    if (this.services != null && this.services.getAssignmentManager() != null) {
1641      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1642    } else {
1643      return new HashMap<>();
1644    }
1645  }
1646
1647  @Override
1648  public void onConfigurationChange(Configuration conf) {
1649  }
1650}