001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.balancer;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.Deque;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableMap;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.function.Predicate;
038import java.util.stream.Collectors;
039
040import org.apache.commons.lang3.NotImplementedException;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseIOException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RackManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * The base class for load balancers. It provides the the functions used to by
069 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
070 * in the edge cases. It doesn't provide an implementation of the
071 * actual balancing algorithm.
072 *
073 */
074@InterfaceAudience.Private
075public abstract class BaseLoadBalancer implements LoadBalancer {
076  protected static final int MIN_SERVER_BALANCE = 2;
077  private volatile boolean stopped = false;
078
079  private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
080
081  static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
082    = load -> load.getRegionMetrics().isEmpty();
083
084  protected RegionLocationFinder regionFinder;
085  protected boolean useRegionFinder;
086
087  private static class DefaultRackManager extends RackManager {
088    @Override
089    public String getRack(ServerName server) {
090      return UNKNOWN_RACK;
091    }
092  }
093
094  /**
095   * The constructor that uses the basic MetricsBalancer
096   */
097  protected BaseLoadBalancer() {
098    metricsBalancer = new MetricsBalancer();
099    createRegionFinder();
100  }
101
102  /**
103   * This Constructor accepts an instance of MetricsBalancer,
104   * which will be used instead of creating a new one
105   */
106  protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
107    this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
108    createRegionFinder();
109  }
110
111  private void createRegionFinder() {
112    useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
113    if (useRegionFinder) {
114      regionFinder = new RegionLocationFinder();
115    }
116  }
117
118  /**
119   * An efficient array based implementation similar to ClusterState for keeping
120   * the status of the cluster in terms of region assignment and distribution.
121   * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
122   * hundreds of thousands of hashmap manipulations are very costly, which is why this
123   * class uses mostly indexes and arrays.
124   *
125   * Cluster tracks a list of unassigned regions, region assignments, and the server
126   * topology in terms of server names, hostnames and racks.
127   */
128  protected static class Cluster {
129    ServerName[] servers;
130    String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
131    String[] racks;
132    boolean multiServersPerHost = false; // whether or not any host has more than one server
133
134    ArrayList<String> tables;
135    RegionInfo[] regions;
136    Deque<BalancerRegionLoad>[] regionLoads;
137    private RegionLocationFinder regionFinder;
138
139    int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
140
141    int[]   serverIndexToHostIndex;      //serverIndex -> host index
142    int[]   serverIndexToRackIndex;      //serverIndex -> rack index
143
144    int[][] regionsPerServer;            //serverIndex -> region list
145    int[]   serverIndexToRegionsOffset;  //serverIndex -> offset of region list
146    int[][] regionsPerHost;              //hostIndex -> list of regions
147    int[][] regionsPerRack;              //rackIndex -> region list
148    int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
149    int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
150    int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
151
152    int[][] serversPerHost;              //hostIndex -> list of server indexes
153    int[][] serversPerRack;              //rackIndex -> list of server indexes
154    int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
155    int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
156    int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
157    int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
158    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
159    int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
160    boolean hasRegionReplicas = false;   //whether there is regions with replicas
161
162    Integer[] serverIndicesSortedByRegionCount;
163    Integer[] serverIndicesSortedByLocality;
164
165    Map<String, Integer> serversToIndex;
166    Map<String, Integer> hostsToIndex;
167    Map<String, Integer> racksToIndex;
168    Map<String, Integer> tablesToIndex;
169    Map<RegionInfo, Integer> regionsToIndex;
170    float[] localityPerServer;
171
172    int numServers;
173    int numHosts;
174    int numRacks;
175    int numTables;
176    int numRegions;
177
178    int numMovedRegions = 0; //num moved regions from the initial configuration
179    Map<ServerName, List<RegionInfo>> clusterState;
180
181    protected final RackManager rackManager;
182    // Maps region -> rackIndex -> locality of region on rack
183    private float[][] rackLocalities;
184    // Maps localityType -> region -> [server|rack]Index with highest locality
185    private int[][] regionsToMostLocalEntities;
186
187    protected Cluster(
188        Map<ServerName, List<RegionInfo>> clusterState,
189        Map<String, Deque<BalancerRegionLoad>> loads,
190        RegionLocationFinder regionFinder,
191        RackManager rackManager) {
192      this(null, clusterState, loads, regionFinder, rackManager);
193    }
194
195    @SuppressWarnings("unchecked")
196    protected Cluster(
197        Collection<RegionInfo> unassignedRegions,
198        Map<ServerName, List<RegionInfo>> clusterState,
199        Map<String, Deque<BalancerRegionLoad>> loads,
200        RegionLocationFinder regionFinder,
201        RackManager rackManager) {
202
203      if (unassignedRegions == null) {
204        unassignedRegions = EMPTY_REGION_LIST;
205      }
206
207      serversToIndex = new HashMap<>();
208      hostsToIndex = new HashMap<>();
209      racksToIndex = new HashMap<>();
210      tablesToIndex = new HashMap<>();
211
212      //TODO: We should get the list of tables from master
213      tables = new ArrayList<>();
214      this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
215
216      numRegions = 0;
217
218      List<List<Integer>> serversPerHostList = new ArrayList<>();
219      List<List<Integer>> serversPerRackList = new ArrayList<>();
220      this.clusterState = clusterState;
221      this.regionFinder = regionFinder;
222
223      // Use servername and port as there can be dead servers in this list. We want everything with
224      // a matching hostname and port to have the same index.
225      for (ServerName sn : clusterState.keySet()) {
226        if (sn == null) {
227          LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " +
228              "skipping; unassigned regions?");
229          if (LOG.isTraceEnabled()) {
230            LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
231          }
232          continue;
233        }
234        if (serversToIndex.get(sn.getAddress().toString()) == null) {
235          serversToIndex.put(sn.getHostAndPort(), numServers++);
236        }
237        if (!hostsToIndex.containsKey(sn.getHostname())) {
238          hostsToIndex.put(sn.getHostname(), numHosts++);
239          serversPerHostList.add(new ArrayList<>(1));
240        }
241
242        int serverIndex = serversToIndex.get(sn.getHostAndPort());
243        int hostIndex = hostsToIndex.get(sn.getHostname());
244        serversPerHostList.get(hostIndex).add(serverIndex);
245
246        String rack = this.rackManager.getRack(sn);
247        if (!racksToIndex.containsKey(rack)) {
248          racksToIndex.put(rack, numRacks++);
249          serversPerRackList.add(new ArrayList<>());
250        }
251        int rackIndex = racksToIndex.get(rack);
252        serversPerRackList.get(rackIndex).add(serverIndex);
253      }
254
255      // Count how many regions there are.
256      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
257        numRegions += entry.getValue().size();
258      }
259      numRegions += unassignedRegions.size();
260
261      regionsToIndex = new HashMap<>(numRegions);
262      servers = new ServerName[numServers];
263      serversPerHost = new int[numHosts][];
264      serversPerRack = new int[numRacks][];
265      regions = new RegionInfo[numRegions];
266      regionIndexToServerIndex = new int[numRegions];
267      initialRegionIndexToServerIndex = new int[numRegions];
268      regionIndexToTableIndex = new int[numRegions];
269      regionIndexToPrimaryIndex = new int[numRegions];
270      regionLoads = new Deque[numRegions];
271
272      regionLocations = new int[numRegions][];
273      serverIndicesSortedByRegionCount = new Integer[numServers];
274      serverIndicesSortedByLocality = new Integer[numServers];
275      localityPerServer = new float[numServers];
276
277      serverIndexToHostIndex = new int[numServers];
278      serverIndexToRackIndex = new int[numServers];
279      regionsPerServer = new int[numServers][];
280      serverIndexToRegionsOffset = new int[numServers];
281      regionsPerHost = new int[numHosts][];
282      regionsPerRack = new int[numRacks][];
283      primariesOfRegionsPerServer = new int[numServers][];
284      primariesOfRegionsPerHost = new int[numHosts][];
285      primariesOfRegionsPerRack = new int[numRacks][];
286
287      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
288
289      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
290        if (entry.getKey() == null) {
291          LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
292          continue;
293        }
294        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
295
296        // keep the servername if this is the first server name for this hostname
297        // or this servername has the newest startcode.
298        if (servers[serverIndex] == null ||
299            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
300          servers[serverIndex] = entry.getKey();
301        }
302
303        if (regionsPerServer[serverIndex] != null) {
304          // there is another server with the same hostAndPort in ClusterState.
305          // allocate the array for the total size
306          regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
307        } else {
308          regionsPerServer[serverIndex] = new int[entry.getValue().size()];
309        }
310        primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
311        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
312        serverIndicesSortedByLocality[serverIndex] = serverIndex;
313      }
314
315      hosts = new String[numHosts];
316      for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
317        hosts[entry.getValue()] = entry.getKey();
318      }
319      racks = new String[numRacks];
320      for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
321        racks[entry.getValue()] = entry.getKey();
322      }
323
324      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
325        int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
326        regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
327
328        int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
329        serverIndexToHostIndex[serverIndex] = hostIndex;
330
331        int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
332        serverIndexToRackIndex[serverIndex] = rackIndex;
333
334        for (RegionInfo region : entry.getValue()) {
335          registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
336          regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
337          regionIndex++;
338        }
339        serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
340      }
341
342      for (RegionInfo region : unassignedRegions) {
343        registerRegion(region, regionIndex, -1, loads, regionFinder);
344        regionIndex++;
345      }
346
347      for (int i = 0; i < serversPerHostList.size(); i++) {
348        serversPerHost[i] = new int[serversPerHostList.get(i).size()];
349        for (int j = 0; j < serversPerHost[i].length; j++) {
350          serversPerHost[i][j] = serversPerHostList.get(i).get(j);
351        }
352        if (serversPerHost[i].length > 1) {
353          multiServersPerHost = true;
354        }
355      }
356
357      for (int i = 0; i < serversPerRackList.size(); i++) {
358        serversPerRack[i] = new int[serversPerRackList.get(i).size()];
359        for (int j = 0; j < serversPerRack[i].length; j++) {
360          serversPerRack[i][j] = serversPerRackList.get(i).get(j);
361        }
362      }
363
364      numTables = tables.size();
365      numRegionsPerServerPerTable = new int[numServers][numTables];
366
367      for (int i = 0; i < numServers; i++) {
368        for (int j = 0; j < numTables; j++) {
369          numRegionsPerServerPerTable[i][j] = 0;
370        }
371      }
372
373      for (int i=0; i < regionIndexToServerIndex.length; i++) {
374        if (regionIndexToServerIndex[i] >= 0) {
375          numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
376        }
377      }
378
379      numMaxRegionsPerTable = new int[numTables];
380      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
381        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
382          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
383            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
384          }
385        }
386      }
387
388      for (int i = 0; i < regions.length; i ++) {
389        RegionInfo info = regions[i];
390        if (RegionReplicaUtil.isDefaultReplica(info)) {
391          regionIndexToPrimaryIndex[i] = i;
392        } else {
393          hasRegionReplicas = true;
394          RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
395          regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
396        }
397      }
398
399      for (int i = 0; i < regionsPerServer.length; i++) {
400        primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
401        for (int j = 0; j < regionsPerServer[i].length; j++) {
402          int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
403          primariesOfRegionsPerServer[i][j] = primaryIndex;
404        }
405        // sort the regions by primaries.
406        Arrays.sort(primariesOfRegionsPerServer[i]);
407      }
408
409      // compute regionsPerHost
410      if (multiServersPerHost) {
411        for (int i = 0 ; i < serversPerHost.length; i++) {
412          int numRegionsPerHost = 0;
413          for (int j = 0; j < serversPerHost[i].length; j++) {
414            numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
415          }
416          regionsPerHost[i] = new int[numRegionsPerHost];
417          primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
418        }
419        for (int i = 0 ; i < serversPerHost.length; i++) {
420          int numRegionPerHostIndex = 0;
421          for (int j = 0; j < serversPerHost[i].length; j++) {
422            for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
423              int region = regionsPerServer[serversPerHost[i][j]][k];
424              regionsPerHost[i][numRegionPerHostIndex] = region;
425              int primaryIndex = regionIndexToPrimaryIndex[region];
426              primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
427              numRegionPerHostIndex++;
428            }
429          }
430          // sort the regions by primaries.
431          Arrays.sort(primariesOfRegionsPerHost[i]);
432        }
433      }
434
435      // compute regionsPerRack
436      if (numRacks > 1) {
437        for (int i = 0 ; i < serversPerRack.length; i++) {
438          int numRegionsPerRack = 0;
439          for (int j = 0; j < serversPerRack[i].length; j++) {
440            numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
441          }
442          regionsPerRack[i] = new int[numRegionsPerRack];
443          primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
444        }
445
446        for (int i = 0 ; i < serversPerRack.length; i++) {
447          int numRegionPerRackIndex = 0;
448          for (int j = 0; j < serversPerRack[i].length; j++) {
449            for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
450              int region = regionsPerServer[serversPerRack[i][j]][k];
451              regionsPerRack[i][numRegionPerRackIndex] = region;
452              int primaryIndex = regionIndexToPrimaryIndex[region];
453              primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
454              numRegionPerRackIndex++;
455            }
456          }
457          // sort the regions by primaries.
458          Arrays.sort(primariesOfRegionsPerRack[i]);
459        }
460      }
461    }
462
463    /** Helper for Cluster constructor to handle a region */
464    private void registerRegion(RegionInfo region, int regionIndex,
465        int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
466        RegionLocationFinder regionFinder) {
467      String tableName = region.getTable().getNameAsString();
468      if (!tablesToIndex.containsKey(tableName)) {
469        tables.add(tableName);
470        tablesToIndex.put(tableName, tablesToIndex.size());
471      }
472      int tableIndex = tablesToIndex.get(tableName);
473
474      regionsToIndex.put(region, regionIndex);
475      regions[regionIndex] = region;
476      regionIndexToServerIndex[regionIndex] = serverIndex;
477      initialRegionIndexToServerIndex[regionIndex] = serverIndex;
478      regionIndexToTableIndex[regionIndex] = tableIndex;
479
480      // region load
481      if (loads != null) {
482        Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
483        // That could have failed if the RegionLoad is using the other regionName
484        if (rl == null) {
485          // Try getting the region load using encoded name.
486          rl = loads.get(region.getEncodedName());
487        }
488        regionLoads[regionIndex] = rl;
489      }
490
491      if (regionFinder != null) {
492        // region location
493        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
494        regionLocations[regionIndex] = new int[loc.size()];
495        for (int i = 0; i < loc.size(); i++) {
496          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
497              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
498                  : serversToIndex.get(loc.get(i).getHostAndPort()));
499        }
500      }
501    }
502
503    /**
504     * Returns true iff a given server has less regions than the balanced amount
505     */
506    public boolean serverHasTooFewRegions(int server) {
507      int minLoad = this.numRegions / numServers;
508      int numRegions = getNumRegions(server);
509      return numRegions < minLoad;
510    }
511
512    /**
513     * Retrieves and lazily initializes a field storing the locality of
514     * every region/server combination
515     */
516    public float[][] getOrComputeRackLocalities() {
517      if (rackLocalities == null || regionsToMostLocalEntities == null) {
518        computeCachedLocalities();
519      }
520      return rackLocalities;
521    }
522
523    /**
524     * Lazily initializes and retrieves a mapping of region -> server for which region has
525     * the highest the locality
526     */
527    public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
528      if (rackLocalities == null || regionsToMostLocalEntities == null) {
529        computeCachedLocalities();
530      }
531      return regionsToMostLocalEntities[type.ordinal()];
532    }
533
534    /**
535     * Looks up locality from cache of localities. Will create cache if it does
536     * not already exist.
537     */
538    public float getOrComputeLocality(int region, int entity, LocalityType type) {
539      switch (type) {
540        case SERVER:
541          return getLocalityOfRegion(region, entity);
542        case RACK:
543          return getOrComputeRackLocalities()[region][entity];
544        default:
545          throw new IllegalArgumentException("Unsupported LocalityType: " + type);
546      }
547    }
548
549    /**
550     * Returns locality weighted by region size in MB. Will create locality cache
551     * if it does not already exist.
552     */
553    public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
554      return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
555    }
556
557    /**
558     * Returns the size in MB from the most recent RegionLoad for region
559     */
560    public int getRegionSizeMB(int region) {
561      Deque<BalancerRegionLoad> load = regionLoads[region];
562      // This means regions have no actual data on disk
563      if (load == null) {
564        return 0;
565      }
566      return regionLoads[region].getLast().getStorefileSizeMB();
567    }
568
569    /**
570     * Computes and caches the locality for each region/rack combinations,
571     * as well as storing a mapping of region -> server and region -> rack such that server
572     * and rack have the highest locality for region
573     */
574    private void computeCachedLocalities() {
575      rackLocalities = new float[numRegions][numRacks];
576      regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
577
578      // Compute localities and find most local server per region
579      for (int region = 0; region < numRegions; region++) {
580        int serverWithBestLocality = 0;
581        float bestLocalityForRegion = 0;
582        for (int server = 0; server < numServers; server++) {
583          // Aggregate per-rack locality
584          float locality = getLocalityOfRegion(region, server);
585          int rack = serverIndexToRackIndex[server];
586          int numServersInRack = serversPerRack[rack].length;
587          rackLocalities[region][rack] += locality / numServersInRack;
588
589          if (locality > bestLocalityForRegion) {
590            serverWithBestLocality = server;
591            bestLocalityForRegion = locality;
592          }
593        }
594        regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
595
596        // Find most local rack per region
597        int rackWithBestLocality = 0;
598        float bestRackLocalityForRegion = 0.0f;
599        for (int rack = 0; rack < numRacks; rack++) {
600          float rackLocality = rackLocalities[region][rack];
601          if (rackLocality > bestRackLocalityForRegion) {
602            bestRackLocalityForRegion = rackLocality;
603            rackWithBestLocality = rack;
604          }
605        }
606        regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
607      }
608
609    }
610
611    /**
612     * Maps region index to rack index
613     */
614    public int getRackForRegion(int region) {
615      return serverIndexToRackIndex[regionIndexToServerIndex[region]];
616    }
617
618    enum LocalityType {
619      SERVER,
620      RACK
621    }
622
623    /** An action to move or swap a region */
624    public static class Action {
625      public enum Type {
626        ASSIGN_REGION,
627        MOVE_REGION,
628        SWAP_REGIONS,
629        NULL,
630      }
631
632      public Type type;
633      public Action (Type type) {this.type = type;}
634      /** Returns an Action which would undo this action */
635      public Action undoAction() { return this; }
636      @Override
637      public String toString() { return type + ":";}
638    }
639
640    public static class AssignRegionAction extends Action {
641      public int region;
642      public int server;
643      public AssignRegionAction(int region, int server) {
644        super(Type.ASSIGN_REGION);
645        this.region = region;
646        this.server = server;
647      }
648      @Override
649      public Action undoAction() {
650        // TODO implement this. This action is not being used by the StochasticLB for now
651        // in case it uses it, we should implement this function.
652        throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
653      }
654      @Override
655      public String toString() {
656        return type + ": " + region + ":" + server;
657      }
658    }
659
660    public static class MoveRegionAction extends Action {
661      public int region;
662      public int fromServer;
663      public int toServer;
664
665      public MoveRegionAction(int region, int fromServer, int toServer) {
666        super(Type.MOVE_REGION);
667        this.fromServer = fromServer;
668        this.region = region;
669        this.toServer = toServer;
670      }
671      @Override
672      public Action undoAction() {
673        return new MoveRegionAction (region, toServer, fromServer);
674      }
675      @Override
676      public String toString() {
677        return type + ": " + region + ":" + fromServer + " -> " + toServer;
678      }
679    }
680
681    public static class SwapRegionsAction extends Action {
682      public int fromServer;
683      public int fromRegion;
684      public int toServer;
685      public int toRegion;
686      public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
687        super(Type.SWAP_REGIONS);
688        this.fromServer = fromServer;
689        this.fromRegion = fromRegion;
690        this.toServer = toServer;
691        this.toRegion = toRegion;
692      }
693      @Override
694      public Action undoAction() {
695        return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
696      }
697      @Override
698      public String toString() {
699        return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
700      }
701    }
702
703    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
704        justification="Mistake. Too disruptive to change now")
705    public static final Action NullAction = new Action(Type.NULL);
706
707    public void doAction(Action action) {
708      switch (action.type) {
709      case NULL: break;
710      case ASSIGN_REGION:
711        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
712        assert action instanceof AssignRegionAction: action.getClass();
713        AssignRegionAction ar = (AssignRegionAction) action;
714        regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
715        regionMoved(ar.region, -1, ar.server);
716        break;
717      case MOVE_REGION:
718        assert action instanceof MoveRegionAction: action.getClass();
719        MoveRegionAction mra = (MoveRegionAction) action;
720        regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
721        regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
722        regionMoved(mra.region, mra.fromServer, mra.toServer);
723        break;
724      case SWAP_REGIONS:
725        assert action instanceof SwapRegionsAction: action.getClass();
726        SwapRegionsAction a = (SwapRegionsAction) action;
727        regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
728        regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
729        regionMoved(a.fromRegion, a.fromServer, a.toServer);
730        regionMoved(a.toRegion, a.toServer, a.fromServer);
731        break;
732      default:
733        throw new RuntimeException("Uknown action:" + action.type);
734      }
735    }
736
737    /**
738     * Return true if the placement of region on server would lower the availability
739     * of the region in question
740     * @return true or false
741     */
742    boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
743      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
744        return false; // safeguard against race between cluster.servers and servers from LB method args
745      }
746      int server = serversToIndex.get(serverName.getHostAndPort());
747      int region = regionsToIndex.get(regionInfo);
748
749      int primary = regionIndexToPrimaryIndex[region];
750      // there is a subset relation for server < host < rack
751      // check server first
752
753      if (contains(primariesOfRegionsPerServer[server], primary)) {
754        // check for whether there are other servers that we can place this region
755        for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
756          if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
757            return true; // meaning there is a better server
758          }
759        }
760        return false; // there is not a better server to place this
761      }
762
763      // check host
764      if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
765        int host = serverIndexToHostIndex[server];
766        if (contains(primariesOfRegionsPerHost[host], primary)) {
767          // check for whether there are other hosts that we can place this region
768          for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
769            if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
770              return true; // meaning there is a better host
771            }
772          }
773          return false; // there is not a better host to place this
774        }
775      }
776
777      // check rack
778      if (numRacks > 1) {
779        int rack = serverIndexToRackIndex[server];
780        if (contains(primariesOfRegionsPerRack[rack], primary)) {
781          // check for whether there are other racks that we can place this region
782          for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
783            if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
784              return true; // meaning there is a better rack
785            }
786          }
787          return false; // there is not a better rack to place this
788        }
789      }
790      return false;
791    }
792
793    void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
794      if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
795        return;
796      }
797      int server = serversToIndex.get(serverName.getHostAndPort());
798      int region = regionsToIndex.get(regionInfo);
799      doAction(new AssignRegionAction(region, server));
800    }
801
802    void regionMoved(int region, int oldServer, int newServer) {
803      regionIndexToServerIndex[region] = newServer;
804      if (initialRegionIndexToServerIndex[region] == newServer) {
805        numMovedRegions--; //region moved back to original location
806      } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
807        numMovedRegions++; //region moved from original location
808      }
809      int tableIndex = regionIndexToTableIndex[region];
810      if (oldServer >= 0) {
811        numRegionsPerServerPerTable[oldServer][tableIndex]--;
812      }
813      numRegionsPerServerPerTable[newServer][tableIndex]++;
814
815      //check whether this caused maxRegionsPerTable in the new Server to be updated
816      if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
817        numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
818      } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
819          == numMaxRegionsPerTable[tableIndex]) {
820        //recompute maxRegionsPerTable since the previous value was coming from the old server
821        numMaxRegionsPerTable[tableIndex] = 0;
822        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
823          if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
824            numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
825          }
826        }
827      }
828
829      // update for servers
830      int primary = regionIndexToPrimaryIndex[region];
831      if (oldServer >= 0) {
832        primariesOfRegionsPerServer[oldServer] = removeRegion(
833          primariesOfRegionsPerServer[oldServer], primary);
834      }
835      primariesOfRegionsPerServer[newServer] = addRegionSorted(
836        primariesOfRegionsPerServer[newServer], primary);
837
838      // update for hosts
839      if (multiServersPerHost) {
840        int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
841        int newHost = serverIndexToHostIndex[newServer];
842        if (newHost != oldHost) {
843          regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
844          primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
845          if (oldHost >= 0) {
846            regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
847            primariesOfRegionsPerHost[oldHost] = removeRegion(
848              primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
849          }
850        }
851      }
852
853      // update for racks
854      if (numRacks > 1) {
855        int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
856        int newRack = serverIndexToRackIndex[newServer];
857        if (newRack != oldRack) {
858          regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
859          primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
860          if (oldRack >= 0) {
861            regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
862            primariesOfRegionsPerRack[oldRack] = removeRegion(
863              primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
864          }
865        }
866      }
867    }
868
869    int[] removeRegion(int[] regions, int regionIndex) {
870      //TODO: this maybe costly. Consider using linked lists
871      int[] newRegions = new int[regions.length - 1];
872      int i = 0;
873      for (i = 0; i < regions.length; i++) {
874        if (regions[i] == regionIndex) {
875          break;
876        }
877        newRegions[i] = regions[i];
878      }
879      System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
880      return newRegions;
881    }
882
883    int[] addRegion(int[] regions, int regionIndex) {
884      int[] newRegions = new int[regions.length + 1];
885      System.arraycopy(regions, 0, newRegions, 0, regions.length);
886      newRegions[newRegions.length - 1] = regionIndex;
887      return newRegions;
888    }
889
890    int[] addRegionSorted(int[] regions, int regionIndex) {
891      int[] newRegions = new int[regions.length + 1];
892      int i = 0;
893      for (i = 0; i < regions.length; i++) { // find the index to insert
894        if (regions[i] > regionIndex) {
895          break;
896        }
897      }
898      System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
899      System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
900      newRegions[i] = regionIndex;
901
902      return newRegions;
903    }
904
905    int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
906      int i = 0;
907      for (i = 0; i < regions.length; i++) {
908        if (regions[i] == regionIndex) {
909          regions[i] = newRegionIndex;
910          break;
911        }
912      }
913      return regions;
914    }
915
916    void sortServersByRegionCount() {
917      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
918    }
919
920    int getNumRegions(int server) {
921      return regionsPerServer[server].length;
922    }
923
924    boolean contains(int[] arr, int val) {
925      return Arrays.binarySearch(arr, val) >= 0;
926    }
927
928    private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
929
930    int getLowestLocalityRegionOnServer(int serverIndex) {
931      if (regionFinder != null) {
932        float lowestLocality = 1.0f;
933        int lowestLocalityRegionIndex = -1;
934        if (regionsPerServer[serverIndex].length == 0) {
935          // No regions on that region server
936          return -1;
937        }
938        for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
939          int regionIndex = regionsPerServer[serverIndex][j];
940          HDFSBlocksDistribution distribution = regionFinder
941              .getBlockDistribution(regions[regionIndex]);
942          float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
943          // skip empty region
944          if (distribution.getUniqueBlocksTotalWeight() == 0) {
945            continue;
946          }
947          if (locality < lowestLocality) {
948            lowestLocality = locality;
949            lowestLocalityRegionIndex = j;
950          }
951        }
952        if (lowestLocalityRegionIndex == -1) {
953          return -1;
954        }
955        if (LOG.isTraceEnabled()) {
956          LOG.trace("Lowest locality region is "
957              + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
958                  .getRegionNameAsString() + " with locality " + lowestLocality
959              + " and its region server contains " + regionsPerServer[serverIndex].length
960              + " regions");
961        }
962        return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
963      } else {
964        return -1;
965      }
966    }
967
968    float getLocalityOfRegion(int region, int server) {
969      if (regionFinder != null) {
970        HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
971        return distribution.getBlockLocalityIndex(servers[server].getHostname());
972      } else {
973        return 0f;
974      }
975    }
976
977    @VisibleForTesting
978    protected void setNumRegions(int numRegions) {
979      this.numRegions = numRegions;
980    }
981
982    @VisibleForTesting
983    protected void setNumMovedRegions(int numMovedRegions) {
984      this.numMovedRegions = numMovedRegions;
985    }
986
987    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
988        justification="Not important but should be fixed")
989    @Override
990    public String toString() {
991      StringBuilder desc = new StringBuilder("Cluster={servers=[");
992      for(ServerName sn:servers) {
993        desc.append(sn.getHostAndPort()).append(", ");
994      }
995      desc.append("], serverIndicesSortedByRegionCount=")
996          .append(Arrays.toString(serverIndicesSortedByRegionCount))
997          .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
998
999      desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
1000          .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1001          .append(", numTables=").append(numTables).append(", numMovedRegions=")
1002          .append(numMovedRegions).append('}');
1003      return desc.toString();
1004    }
1005  }
1006
1007  // slop for regions
1008  protected float slop;
1009  // overallSlop to control simpleLoadBalancer's cluster level threshold
1010  protected float overallSlop;
1011  protected Configuration config = HBaseConfiguration.create();
1012  protected RackManager rackManager;
1013  private static final Random RANDOM = new Random(System.currentTimeMillis());
1014  private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
1015  protected MetricsBalancer metricsBalancer = null;
1016  protected ClusterMetrics clusterStatus = null;
1017  protected ServerName masterServerName;
1018  protected MasterServices services;
1019  protected boolean onlySystemTablesOnMaster;
1020  protected boolean maintenanceMode;
1021
1022  @Override
1023  public void setConf(Configuration conf) {
1024    this.config = conf;
1025    setSlop(conf);
1026    if (slop < 0) slop = 0;
1027    else if (slop > 1) slop = 1;
1028
1029    if (overallSlop < 0) overallSlop = 0;
1030    else if (overallSlop > 1) overallSlop = 1;
1031
1032    this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config);
1033
1034    this.rackManager = new RackManager(getConf());
1035    if (useRegionFinder) {
1036      regionFinder.setConf(conf);
1037    }
1038    // Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
1039    LOG.info("slop={}, systemTablesOnMaster={}",
1040        this.slop, this.onlySystemTablesOnMaster);
1041  }
1042
1043  protected void setSlop(Configuration conf) {
1044    this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1045    this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
1046  }
1047
1048  /**
1049   * Check if a region belongs to some system table.
1050   * If so, the primary replica may be expected to be put on the master regionserver.
1051   */
1052  public boolean shouldBeOnMaster(RegionInfo region) {
1053    return (this.maintenanceMode || this.onlySystemTablesOnMaster)
1054        && region.getTable().isSystemTable();
1055  }
1056
1057  /**
1058   * Balance the regions that should be on master regionserver.
1059   */
1060  protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) {
1061    if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null;
1062    List<RegionPlan> plans = null;
1063    List<RegionInfo> regions = clusterMap.get(masterServerName);
1064    if (regions != null) {
1065      Iterator<ServerName> keyIt = null;
1066      for (RegionInfo region: regions) {
1067        if (shouldBeOnMaster(region)) continue;
1068
1069        // Find a non-master regionserver to host the region
1070        if (keyIt == null || !keyIt.hasNext()) {
1071          keyIt = clusterMap.keySet().iterator();
1072        }
1073        ServerName dest = keyIt.next();
1074        if (masterServerName.equals(dest)) {
1075          if (!keyIt.hasNext()) {
1076            keyIt = clusterMap.keySet().iterator();
1077          }
1078          dest = keyIt.next();
1079        }
1080
1081        // Move this region away from the master regionserver
1082        RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1083        if (plans == null) {
1084          plans = new ArrayList<>();
1085        }
1086        plans.add(plan);
1087      }
1088    }
1089    for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) {
1090      if (masterServerName.equals(server.getKey())) continue;
1091      for (RegionInfo region: server.getValue()) {
1092        if (!shouldBeOnMaster(region)) continue;
1093
1094        // Move this region to the master regionserver
1095        RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1096        if (plans == null) {
1097          plans = new ArrayList<>();
1098        }
1099        plans.add(plan);
1100      }
1101    }
1102    return plans;
1103  }
1104
1105  /**
1106   * If master is configured to carry system tables only, in here is
1107   * where we figure what to assign it.
1108   */
1109  protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions(
1110      Collection<RegionInfo> regions, List<ServerName> servers) {
1111    if (servers == null || regions == null || regions.isEmpty()) {
1112      return null;
1113    }
1114    Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
1115    if (this.maintenanceMode || this.onlySystemTablesOnMaster) {
1116      if (masterServerName != null && servers.contains(masterServerName)) {
1117        assignments.put(masterServerName, new ArrayList<>());
1118        for (RegionInfo region : regions) {
1119          if (shouldBeOnMaster(region)) {
1120            assignments.get(masterServerName).add(region);
1121          }
1122        }
1123      }
1124    }
1125    return assignments;
1126  }
1127
1128  @Override
1129  public Configuration getConf() {
1130    return this.config;
1131  }
1132
1133  @Override
1134  public synchronized void setClusterMetrics(ClusterMetrics st) {
1135    this.clusterStatus = st;
1136    if (useRegionFinder) {
1137      regionFinder.setClusterMetrics(st);
1138    }
1139  }
1140
1141  @Override
1142  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
1143
1144  }
1145
1146  @Override
1147  public void setMasterServices(MasterServices masterServices) {
1148    masterServerName = masterServices.getServerName();
1149    this.services = masterServices;
1150    if (useRegionFinder) {
1151      this.regionFinder.setServices(masterServices);
1152    }
1153    if (this.services.isInMaintenanceMode()) {
1154      this.maintenanceMode = true;
1155    }
1156  }
1157
1158  @Override
1159  public void postMasterStartupInitialize() {
1160    if (services != null && regionFinder != null) {
1161      try {
1162        Set<RegionInfo> regions =
1163            services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
1164        regionFinder.refreshAndWait(regions);
1165      } catch (Exception e) {
1166        LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
1167      }
1168    }
1169  }
1170
1171  public void setRackManager(RackManager rackManager) {
1172    this.rackManager = rackManager;
1173  }
1174
1175  protected boolean needsBalance(Cluster c) {
1176    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1177    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1178      if (LOG.isDebugEnabled()) {
1179        LOG.debug("Not running balancer because only " + cs.getNumServers()
1180            + " active regionserver(s)");
1181      }
1182      return false;
1183    }
1184    if(areSomeRegionReplicasColocated(c)) return true;
1185    // Check if we even need to do any load balancing
1186    // HBASE-3681 check sloppiness first
1187    float average = cs.getLoadAverage(); // for logging
1188    int floor = (int) Math.floor(average * (1 - slop));
1189    int ceiling = (int) Math.ceil(average * (1 + slop));
1190    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1191      NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
1192      if (LOG.isTraceEnabled()) {
1193        // If nothing to balance, then don't say anything unless trace-level logging.
1194        LOG.trace("Skipping load balancing because balanced cluster; " +
1195          "servers=" + cs.getNumServers() +
1196          " regions=" + cs.getNumRegions() + " average=" + average +
1197          " mostloaded=" + serversByLoad.lastKey().getLoad() +
1198          " leastloaded=" + serversByLoad.firstKey().getLoad());
1199      }
1200      return false;
1201    }
1202    return true;
1203  }
1204
1205  /**
1206   * Subclasses should implement this to return true if the cluster has nodes that hosts
1207   * multiple replicas for the same region, or, if there are multiple racks and the same
1208   * rack hosts replicas of the same region
1209   * @param c Cluster information
1210   * @return whether region replicas are currently co-located
1211   */
1212  protected boolean areSomeRegionReplicasColocated(Cluster c) {
1213    return false;
1214  }
1215
1216  /**
1217   * Generates a bulk assignment plan to be used on cluster startup using a
1218   * simple round-robin assignment.
1219   * <p>
1220   * Takes a list of all the regions and all the servers in the cluster and
1221   * returns a map of each server to the regions that it should be assigned.
1222   * <p>
1223   * Currently implemented as a round-robin assignment. Same invariant as load
1224   * balancing, all servers holding floor(avg) or ceiling(avg).
1225   *
1226   * TODO: Use block locations from HDFS to place regions with their blocks
1227   *
1228   * @param regions all regions
1229   * @param servers all servers
1230   * @return map of server to the regions it should take, or null if no
1231   *         assignment is possible (ie. no regions or no servers)
1232   */
1233  @Override
1234  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
1235      List<ServerName> servers) throws HBaseIOException {
1236    metricsBalancer.incrMiscInvocations();
1237    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers);
1238    if (assignments != null && !assignments.isEmpty()) {
1239      servers = new ArrayList<>(servers);
1240      // Guarantee not to put other regions on master
1241      servers.remove(masterServerName);
1242      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1243      if (!masterRegions.isEmpty()) {
1244        regions = new ArrayList<>(regions);
1245        regions.removeAll(masterRegions);
1246      }
1247    }
1248    if (this.maintenanceMode || regions == null || regions.isEmpty()) {
1249      return assignments;
1250    }
1251
1252    int numServers = servers == null ? 0 : servers.size();
1253    if (numServers == 0) {
1254      LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1255      return null;
1256    }
1257
1258    // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1259    // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1260    // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1261    // and balanced. This should also run fast with fewer number of iterations.
1262
1263    if (numServers == 1) { // Only one server, nothing fancy we can do here
1264      ServerName server = servers.get(0);
1265      assignments.put(server, new ArrayList<>(regions));
1266      return assignments;
1267    }
1268
1269    Cluster cluster = createCluster(servers, regions);
1270    List<RegionInfo> unassignedRegions = new ArrayList<>();
1271
1272    roundRobinAssignment(cluster, regions, unassignedRegions,
1273      servers, assignments);
1274
1275    List<RegionInfo> lastFewRegions = new ArrayList<>();
1276    // assign the remaining by going through the list and try to assign to servers one-by-one
1277    int serverIdx = RANDOM.nextInt(numServers);
1278    OUTER : for (RegionInfo region : unassignedRegions) {
1279      boolean assigned = false;
1280      INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
1281        ServerName serverName = servers.get((j + serverIdx) % numServers);
1282        if (!cluster.wouldLowerAvailability(region, serverName)) {
1283          List<RegionInfo> serverRegions =
1284              assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
1285          if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) {
1286            // if the region is not a default replica
1287            // check if the assignments map has the other replica region on this server
1288            for (RegionInfo hri : serverRegions) {
1289              if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) {
1290                if (LOG.isTraceEnabled()) {
1291                  LOG.trace("Skipping the server, " + serverName
1292                      + " , got the same server for the region " + region);
1293                }
1294                // do not allow this case. The unassignedRegions we got because the
1295                // replica region in this list was not assigned because of lower availablity issue.
1296                // So when we assign here we should ensure that as far as possible the server being
1297                // selected does not have the server where the replica region was not assigned.
1298                continue INNER; // continue the inner loop, ie go to the next server
1299              }
1300            }
1301          }
1302          serverRegions.add(region);
1303          cluster.doAssignRegion(region, serverName);
1304          serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1305          assigned = true;
1306          break;
1307        }
1308      }
1309      if (!assigned) {
1310        lastFewRegions.add(region);
1311      }
1312    }
1313    // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1314    // make it optimal later. we can end up with this if numReplicas > numServers.
1315    for (RegionInfo region : lastFewRegions) {
1316      int i = RANDOM.nextInt(numServers);
1317      ServerName server = servers.get(i);
1318      List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>());
1319      serverRegions.add(region);
1320      cluster.doAssignRegion(region, server);
1321    }
1322    return assignments;
1323  }
1324
1325  protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
1326      throws HBaseIOException {
1327    boolean hasRegionReplica = false;
1328    try {
1329      if (services != null && services.getTableDescriptors() != null) {
1330        Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
1331        for (RegionInfo regionInfo : regions) {
1332          TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
1333          if (td != null && td.getRegionReplication() > 1) {
1334            hasRegionReplica = true;
1335            break;
1336          }
1337        }
1338      }
1339    } catch (IOException ioe) {
1340      throw new HBaseIOException(ioe);
1341    }
1342
1343    // Get the snapshot of the current assignments for the regions in question, and then create
1344    // a cluster out of it. Note that we might have replicas already assigned to some servers
1345    // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1346    // replicas of the regions that are passed (for performance).
1347    Map<ServerName, List<RegionInfo>> clusterState = null;
1348    if (!hasRegionReplica) {
1349      clusterState = getRegionAssignmentsByServer(regions);
1350    } else {
1351      // for the case where we have region replica it is better we get the entire cluster's snapshot
1352      clusterState = getRegionAssignmentsByServer(null);
1353    }
1354
1355    for (ServerName server : servers) {
1356      if (!clusterState.containsKey(server)) {
1357        clusterState.put(server, EMPTY_REGION_LIST);
1358      }
1359    }
1360    return new Cluster(regions, clusterState, null, this.regionFinder,
1361        rackManager);
1362  }
1363
1364  private List<ServerName> findIdleServers(List<ServerName> servers) {
1365    return this.services.getServerManager()
1366            .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
1367  }
1368
1369  /**
1370   * Used to assign a single region to a random server.
1371   */
1372  @Override
1373  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
1374      throws HBaseIOException {
1375    metricsBalancer.incrMiscInvocations();
1376    if (servers != null && servers.contains(masterServerName)) {
1377      if (shouldBeOnMaster(regionInfo)) {
1378        return masterServerName;
1379      }
1380      if (!LoadBalancer.isTablesOnMaster(getConf())) {
1381        // Guarantee we do not put any regions on master
1382        servers = new ArrayList<>(servers);
1383        servers.remove(masterServerName);
1384      }
1385    }
1386
1387    int numServers = servers == null ? 0 : servers.size();
1388    if (numServers == 0) {
1389      LOG.warn("Wanted to retain assignment but no servers to assign to");
1390      return null;
1391    }
1392    if (numServers == 1) { // Only one server, nothing fancy we can do here
1393      return servers.get(0);
1394    }
1395    List<ServerName> idleServers = findIdleServers(servers);
1396    if (idleServers.size() == 1) {
1397      return idleServers.get(0);
1398    }
1399    final List<ServerName> finalServers = idleServers.isEmpty() ?
1400            servers : idleServers;
1401    List<RegionInfo> regions = Lists.newArrayList(regionInfo);
1402    Cluster cluster = createCluster(finalServers, regions);
1403    return randomAssignment(cluster, regionInfo, finalServers);
1404  }
1405
1406  /**
1407   * Generates a bulk assignment startup plan, attempting to reuse the existing
1408   * assignment information from META, but adjusting for the specified list of
1409   * available/online servers available for assignment.
1410   * <p>
1411   * Takes a map of all regions to their existing assignment from META. Also
1412   * takes a list of online servers for regions to be assigned to. Attempts to
1413   * retain all assignment, so in some instances initial assignment will not be
1414   * completely balanced.
1415   * <p>
1416   * Any leftover regions without an existing server to be assigned to will be
1417   * assigned randomly to available servers.
1418   *
1419   * @param regions regions and existing assignment from meta
1420   * @param servers available servers
1421   * @return map of servers and regions to be assigned to them
1422   */
1423  @Override
1424  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
1425      List<ServerName> servers) throws HBaseIOException {
1426    // Update metrics
1427    metricsBalancer.incrMiscInvocations();
1428    Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers);
1429    if (assignments != null && !assignments.isEmpty()) {
1430      servers = new ArrayList<>(servers);
1431      // Guarantee not to put other regions on master
1432      servers.remove(masterServerName);
1433      List<RegionInfo> masterRegions = assignments.get(masterServerName);
1434      regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey()))
1435          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1436    }
1437    if (this.maintenanceMode || regions.isEmpty()) {
1438      return assignments;
1439    }
1440
1441    int numServers = servers == null ? 0 : servers.size();
1442    if (numServers == 0) {
1443      LOG.warn("Wanted to do retain assignment but no servers to assign to");
1444      return null;
1445    }
1446    if (numServers == 1) { // Only one server, nothing fancy we can do here
1447      ServerName server = servers.get(0);
1448      assignments.put(server, new ArrayList<>(regions.keySet()));
1449      return assignments;
1450    }
1451
1452    // Group all of the old assignments by their hostname.
1453    // We can't group directly by ServerName since the servers all have
1454    // new start-codes.
1455
1456    // Group the servers by their hostname. It's possible we have multiple
1457    // servers on the same host on different ports.
1458    ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1459    for (ServerName server : servers) {
1460      assignments.put(server, new ArrayList<>());
1461      serversByHostname.put(server.getHostnameLowerCase(), server);
1462    }
1463
1464    // Collection of the hostnames that used to have regions
1465    // assigned, but for which we no longer have any RS running
1466    // after the cluster restart.
1467    Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1468
1469    // If the old servers aren't present, lets assign those regions later.
1470    List<RegionInfo> randomAssignRegions = Lists.newArrayList();
1471
1472    int numRandomAssignments = 0;
1473    int numRetainedAssigments = 0;
1474    for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
1475      RegionInfo region = entry.getKey();
1476      ServerName oldServerName = entry.getValue();
1477      List<ServerName> localServers = new ArrayList<>();
1478      if (oldServerName != null) {
1479        localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
1480      }
1481      if (localServers.isEmpty()) {
1482        // No servers on the new cluster match up with this hostname, assign randomly, later.
1483        randomAssignRegions.add(region);
1484        if (oldServerName != null) {
1485          oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase());
1486        }
1487      } else if (localServers.size() == 1) {
1488        // the usual case - one new server on same host
1489        ServerName target = localServers.get(0);
1490        assignments.get(target).add(region);
1491        numRetainedAssigments++;
1492      } else {
1493        // multiple new servers in the cluster on this same host
1494        if (localServers.contains(oldServerName)) {
1495          assignments.get(oldServerName).add(region);
1496          numRetainedAssigments++;
1497        } else {
1498          ServerName target = null;
1499          for (ServerName tmp : localServers) {
1500            if (tmp.getPort() == oldServerName.getPort()) {
1501              target = tmp;
1502              assignments.get(tmp).add(region);
1503              numRetainedAssigments++;
1504              break;
1505            }
1506          }
1507          if (target == null) {
1508            randomAssignRegions.add(region);
1509          }
1510        }
1511      }
1512    }
1513
1514    // If servers from prior assignment aren't present, then lets do randomAssignment on regions.
1515    if (randomAssignRegions.size() > 0) {
1516      Cluster cluster = createCluster(servers, regions.keySet());
1517      for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
1518        ServerName sn = entry.getKey();
1519        for (RegionInfo region : entry.getValue()) {
1520          cluster.doAssignRegion(region, sn);
1521        }
1522      }
1523      for (RegionInfo region : randomAssignRegions) {
1524        ServerName target = randomAssignment(cluster, region, servers);
1525        assignments.get(target).add(region);
1526        numRandomAssignments++;
1527      }
1528    }
1529
1530    String randomAssignMsg = "";
1531    if (numRandomAssignments > 0) {
1532      randomAssignMsg =
1533          numRandomAssignments + " regions were assigned "
1534              + "to random hosts, since the old hosts for these regions are no "
1535              + "longer present in the cluster. These hosts were:\n  "
1536              + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1537    }
1538
1539    LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1540        + " retained the pre-restart assignment. " + randomAssignMsg);
1541    return assignments;
1542  }
1543
1544  @Override
1545  public void initialize() throws HBaseIOException{
1546  }
1547
1548  @Override
1549  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
1550  }
1551
1552  @Override
1553  public void regionOffline(RegionInfo regionInfo) {
1554  }
1555
1556  @Override
1557  public boolean isStopped() {
1558    return stopped;
1559  }
1560
1561  @Override
1562  public void stop(String why) {
1563    LOG.info("Load Balancer stop requested: "+why);
1564    stopped = true;
1565  }
1566
1567  /**
1568  * Updates the balancer status tag reported to JMX
1569  */
1570  public void updateBalancerStatus(boolean status) {
1571    metricsBalancer.balancerStatus(status);
1572  }
1573
1574  /**
1575   * Used to assign a single region to a random server.
1576   */
1577  private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
1578      List<ServerName> servers) {
1579    int numServers = servers.size(); // servers is not null, numServers > 1
1580    ServerName sn = null;
1581    final int maxIterations = numServers * 4;
1582    int iterations = 0;
1583    List<ServerName> usedSNs = new ArrayList<>(servers.size());
1584    do {
1585      int i = RANDOM.nextInt(numServers);
1586      sn = servers.get(i);
1587      if (!usedSNs.contains(sn)) {
1588        usedSNs.add(sn);
1589      }
1590    } while (cluster.wouldLowerAvailability(regionInfo, sn)
1591        && iterations++ < maxIterations);
1592    if (iterations >= maxIterations) {
1593      // We have reached the max. Means the servers that we collected is still lowering the
1594      // availability
1595      for (ServerName unusedServer : servers) {
1596        if (!usedSNs.contains(unusedServer)) {
1597          // check if any other unused server is there for us to use.
1598          // If so use it. Else we have not other go but to go with one of them
1599          if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) {
1600            sn = unusedServer;
1601            break;
1602          }
1603        }
1604      }
1605    }
1606    cluster.doAssignRegion(regionInfo, sn);
1607    return sn;
1608  }
1609
1610  /**
1611   * Round robin a list of regions to a list of servers
1612   */
1613  private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
1614      List<RegionInfo> unassignedRegions, List<ServerName> servers,
1615      Map<ServerName, List<RegionInfo>> assignments) {
1616
1617    int numServers = servers.size();
1618    int numRegions = regions.size();
1619    int max = (int) Math.ceil((float) numRegions / numServers);
1620    int serverIdx = 0;
1621    if (numServers > 1) {
1622      serverIdx = RANDOM.nextInt(numServers);
1623    }
1624    int regionIdx = 0;
1625
1626    for (int j = 0; j < numServers; j++) {
1627      ServerName server = servers.get((j + serverIdx) % numServers);
1628      List<RegionInfo> serverRegions = new ArrayList<>(max);
1629      for (int i = regionIdx; i < numRegions; i += numServers) {
1630        RegionInfo region = regions.get(i % numRegions);
1631        if (cluster.wouldLowerAvailability(region, server)) {
1632          unassignedRegions.add(region);
1633        } else {
1634          serverRegions.add(region);
1635          cluster.doAssignRegion(region, server);
1636        }
1637      }
1638      assignments.put(server, serverRegions);
1639      regionIdx++;
1640    }
1641  }
1642
1643  protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
1644    Collection<RegionInfo> regions) {
1645    if (this.services != null && this.services.getAssignmentManager() != null) {
1646      return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1647    } else {
1648      return new HashMap<>();
1649    }
1650  }
1651
1652  @Override
1653  public void onConfigurationChange(Configuration conf) {
1654  }
1655}