001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Supplier;
032import org.agrona.collections.Hashing;
033import org.agrona.collections.Int2IntCounterMap;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionReplicaUtil;
038import org.apache.hadoop.hbase.master.RackManager;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
047
048/**
049 * An efficient array based implementation similar to ClusterState for keeping the status of the
050 * cluster in terms of region assignment and distribution. LoadBalancers, such as
051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
052 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
053 * <p/>
054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
055 * topology in terms of server names, hostnames and racks.
056 */
057@InterfaceAudience.Private
058class BalancerClusterState {
059
060  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
061
062  ServerName[] servers;
063  // ServerName uniquely identifies a region server. multiple RS can run on the same host
064  String[] hosts;
065  String[] racks;
066  boolean multiServersPerHost = false; // whether or not any host has more than one server
067
068  ArrayList<String> tables;
069  RegionInfo[] regions;
070  Deque<BalancerRegionLoad>[] regionLoads;
071  private RegionHDFSBlockLocationFinder regionFinder;
072
073  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
074
075  int[] serverIndexToHostIndex; // serverIndex -> host index
076  int[] serverIndexToRackIndex; // serverIndex -> rack index
077
078  int[][] regionsPerServer; // serverIndex -> region list
079  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
080  int[][] regionsPerHost; // hostIndex -> list of regions
081  int[][] regionsPerRack; // rackIndex -> region list
082  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
083  // replicas by primary region index
084  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
085  // primary region index
086  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
087  // primary region index
088
089  int[][] serversPerHost; // hostIndex -> list of server indexes
090  int[][] serversPerRack; // rackIndex -> list of server indexes
091  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
092  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
093  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
094  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
095  int[] numRegionsPerTable; // tableIndex -> region count
096  double[] meanRegionsPerTable; // mean region count per table
097  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
098  boolean hasRegionReplicas = false; // whether there is regions with replicas
099
100  Integer[] serverIndicesSortedByRegionCount;
101  Integer[] serverIndicesSortedByLocality;
102
103  Map<Address, Integer> serversToIndex;
104  Map<String, Integer> hostsToIndex;
105  Map<String, Integer> racksToIndex;
106  Map<String, Integer> tablesToIndex;
107  Map<RegionInfo, Integer> regionsToIndex;
108  float[] localityPerServer;
109
110  int numServers;
111  int numHosts;
112  int numRacks;
113  int numTables;
114  int numRegions;
115  int maxReplicas = 1;
116
117  int numMovedRegions = 0; // num moved regions from the initial configuration
118  Map<ServerName, List<RegionInfo>> clusterState;
119
120  private final RackManager rackManager;
121  // Maps region -> rackIndex -> locality of region on rack
122  private float[][] rackLocalities;
123  // Maps localityType -> region -> [server|rack]Index with highest locality
124  private int[][] regionsToMostLocalEntities;
125  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
126  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
127  // Maps regionIndex -> serverIndex with best region cache ratio
128  private int[] regionServerIndexWithBestRegionCachedRatio;
129  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
130  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
131  // cache free space available on each server, aligned to the "servers" array indices;
132  long[] serverBlockCacheFreeSize;
133
134  private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
135    Suppliers.memoizeWithExpiration(() -> {
136      Collection<Integer> serverIndices = serversToIndex.values();
137      List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
138      Collections.shuffle(shuffledServerIndices);
139      return shuffledServerIndices;
140    }, 5, TimeUnit.SECONDS);
141  private long stopRequestedAt = Long.MAX_VALUE;
142
143  static class DefaultRackManager extends RackManager {
144    @Override
145    public String getRack(ServerName server) {
146      return UNKNOWN_RACK;
147    }
148  }
149
150  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
151    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
152    RackManager rackManager) {
153    this(null, clusterState, loads, regionFinder, rackManager, null, null);
154  }
155
156  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
157    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
158    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
159    Map<ServerName, Long> serverBlockCacheFreeByServer) {
160    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio,
161      serverBlockCacheFreeByServer);
162  }
163
164  @SuppressWarnings("unchecked")
165  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
166    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
167    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
168    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
169    Map<ServerName, Long> serverBlockCacheFreeByServer) {
170    if (unassignedRegions == null) {
171      unassignedRegions = Collections.emptyList();
172    }
173
174    serversToIndex = new HashMap<>();
175    hostsToIndex = new HashMap<>();
176    racksToIndex = new HashMap<>();
177    tablesToIndex = new HashMap<>();
178
179    // TODO: We should get the list of tables from master
180    tables = new ArrayList<>();
181    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
182
183    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
184
185    numRegions = 0;
186
187    List<List<Integer>> serversPerHostList = new ArrayList<>();
188    List<List<Integer>> serversPerRackList = new ArrayList<>();
189    this.clusterState = clusterState;
190    this.regionFinder = regionFinder;
191
192    // Use servername and port as there can be dead servers in this list. We want everything with
193    // a matching hostname and port to have the same index.
194    for (ServerName sn : clusterState.keySet()) {
195      if (sn == null) {
196        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
197          + "skipping; unassigned regions?");
198        if (LOG.isTraceEnabled()) {
199          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
200        }
201        continue;
202      }
203      if (serversToIndex.get(sn.getAddress()) == null) {
204        serversToIndex.put(sn.getAddress(), numServers++);
205      }
206      if (!hostsToIndex.containsKey(sn.getHostname())) {
207        hostsToIndex.put(sn.getHostname(), numHosts++);
208        serversPerHostList.add(new ArrayList<>(1));
209      }
210
211      int serverIndex = serversToIndex.get(sn.getAddress());
212      int hostIndex = hostsToIndex.get(sn.getHostname());
213      serversPerHostList.get(hostIndex).add(serverIndex);
214
215      String rack = this.rackManager.getRack(sn);
216
217      if (!racksToIndex.containsKey(rack)) {
218        racksToIndex.put(rack, numRacks++);
219        serversPerRackList.add(new ArrayList<>());
220      }
221      int rackIndex = racksToIndex.get(rack);
222      serversPerRackList.get(rackIndex).add(serverIndex);
223    }
224
225    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
226    // Count how many regions there are.
227    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
228      numRegions += entry.getValue().size();
229    }
230    numRegions += unassignedRegions.size();
231
232    regionsToIndex = new HashMap<>(numRegions);
233    servers = new ServerName[numServers];
234    serversPerHost = new int[numHosts][];
235    serversPerRack = new int[numRacks][];
236    regions = new RegionInfo[numRegions];
237    regionIndexToServerIndex = new int[numRegions];
238    initialRegionIndexToServerIndex = new int[numRegions];
239    regionIndexToTableIndex = new int[numRegions];
240    regionIndexToPrimaryIndex = new int[numRegions];
241    regionLoads = new Deque[numRegions];
242
243    regionLocations = new int[numRegions][];
244    serverIndicesSortedByRegionCount = new Integer[numServers];
245    serverIndicesSortedByLocality = new Integer[numServers];
246    localityPerServer = new float[numServers];
247
248    serverIndexToHostIndex = new int[numServers];
249    serverIndexToRackIndex = new int[numServers];
250    regionsPerServer = new int[numServers][];
251    serverIndexToRegionsOffset = new int[numServers];
252    regionsPerHost = new int[numHosts][];
253    regionsPerRack = new int[numRacks][];
254    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
255    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
256    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
257
258    int regionIndex = 0, regionPerServerIndex = 0;
259
260    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
261      if (entry.getKey() == null) {
262        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
263        continue;
264      }
265      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
266
267      // keep the servername if this is the first server name for this hostname
268      // or this servername has the newest startcode.
269      if (
270        servers[serverIndex] == null
271          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
272      ) {
273        servers[serverIndex] = entry.getKey();
274      }
275
276      if (regionsPerServer[serverIndex] != null) {
277        // there is another server with the same hostAndPort in ClusterState.
278        // allocate the array for the total size
279        regionsPerServer[serverIndex] =
280          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
281      } else {
282        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
283      }
284      colocatedReplicaCountsPerServer[serverIndex] =
285        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
286      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
287      serverIndicesSortedByLocality[serverIndex] = serverIndex;
288    }
289
290    hosts = new String[numHosts];
291    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
292      hosts[entry.getValue()] = entry.getKey();
293    }
294    racks = new String[numRacks];
295    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
296      racks[entry.getValue()] = entry.getKey();
297    }
298
299    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
300      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
301      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
302
303      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
304      serverIndexToHostIndex[serverIndex] = hostIndex;
305
306      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
307      serverIndexToRackIndex[serverIndex] = rackIndex;
308
309      for (RegionInfo region : entry.getValue()) {
310        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
311        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
312        regionIndex++;
313      }
314      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
315    }
316
317    for (RegionInfo region : unassignedRegions) {
318      registerRegion(region, regionIndex, -1, loads, regionFinder);
319      regionIndex++;
320    }
321
322    if (LOG.isTraceEnabled()) {
323      for (int i = 0; i < numServers; i++) {
324        LOG.trace("server {} has {} regions", i, regionsPerServer[i].length);
325      }
326    }
327    for (int i = 0; i < serversPerHostList.size(); i++) {
328      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
329      for (int j = 0; j < serversPerHost[i].length; j++) {
330        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
331        LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i);
332      }
333      if (serversPerHost[i].length > 1) {
334        multiServersPerHost = true;
335      }
336    }
337
338    for (int i = 0; i < serversPerRackList.size(); i++) {
339      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
340      for (int j = 0; j < serversPerRack[i].length; j++) {
341        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
342        LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
343      }
344    }
345
346    numTables = tables.size();
347    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
348      numRacks);
349    numRegionsPerServerPerTable = new int[numTables][numServers];
350    numRegionsPerTable = new int[numTables];
351
352    for (int i = 0; i < numTables; i++) {
353      for (int j = 0; j < numServers; j++) {
354        numRegionsPerServerPerTable[i][j] = 0;
355      }
356    }
357
358    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
359      if (regionIndexToServerIndex[i] >= 0) {
360        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
361        numRegionsPerTable[regionIndexToTableIndex[i]]++;
362      }
363    }
364
365    // Avoid repeated computation for planning
366    meanRegionsPerTable = new double[numTables];
367
368    for (int i = 0; i < numTables; i++) {
369      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
370    }
371
372    for (int i = 0; i < regions.length; i++) {
373      RegionInfo info = regions[i];
374      if (RegionReplicaUtil.isDefaultReplica(info)) {
375        regionIndexToPrimaryIndex[i] = i;
376      } else {
377        hasRegionReplicas = true;
378        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
379        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
380      }
381    }
382
383    for (int i = 0; i < regionsPerServer.length; i++) {
384      colocatedReplicaCountsPerServer[i] =
385        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
386      for (int j = 0; j < regionsPerServer[i].length; j++) {
387        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
388        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
389      }
390    }
391    // compute regionsPerHost
392    if (multiServersPerHost) {
393      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
394        serversPerHost);
395    }
396
397    // compute regionsPerRack
398    if (numRacks > 1) {
399      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
400        serversPerRack);
401    }
402
403    this.serverBlockCacheFreeSize = new long[numServers];
404    if (serverBlockCacheFreeByServer != null) {
405      for (int i = 0; i < numServers; i++) {
406        ServerName sn = servers[i];
407        this.serverBlockCacheFreeSize[i] =
408          sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L);
409      }
410    }
411  }
412
413  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
414    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
415    for (int i = 0; i < serversPerLocation.length; i++) {
416      int numRegionsPerLocation = 0;
417      for (int j = 0; j < serversPerLocation[i].length; j++) {
418        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
419      }
420      regionsPerLocation[i] = new int[numRegionsPerLocation];
421      colocatedReplicaCountsPerLocation[i] =
422        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
423    }
424
425    for (int i = 0; i < serversPerLocation.length; i++) {
426      int numRegionPerLocationIndex = 0;
427      for (int j = 0; j < serversPerLocation[i].length; j++) {
428        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
429          int region = regionsPerServer[serversPerLocation[i][j]][k];
430          regionsPerLocation[i][numRegionPerLocationIndex] = region;
431          int primaryIndex = regionIndexToPrimaryIndex[region];
432          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
433          numRegionPerLocationIndex++;
434        }
435      }
436    }
437
438  }
439
440  /** Helper for Cluster constructor to handle a region */
441  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
442    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
443    String tableName = region.getTable().getNameAsString();
444    if (!tablesToIndex.containsKey(tableName)) {
445      tables.add(tableName);
446      tablesToIndex.put(tableName, tablesToIndex.size());
447    }
448    int tableIndex = tablesToIndex.get(tableName);
449
450    regionsToIndex.put(region, regionIndex);
451    regions[regionIndex] = region;
452    regionIndexToServerIndex[regionIndex] = serverIndex;
453    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
454    regionIndexToTableIndex[regionIndex] = tableIndex;
455
456    // region load
457    if (loads != null) {
458      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
459      // That could have failed if the RegionLoad is using the other regionName
460      if (rl == null) {
461        // Try getting the region load using encoded name.
462        rl = loads.get(region.getEncodedName());
463      }
464      regionLoads[regionIndex] = rl;
465    }
466
467    if (regionFinder != null) {
468      // region location
469      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
470      regionLocations[regionIndex] = new int[loc.size()];
471      for (int i = 0; i < loc.size(); i++) {
472        regionLocations[regionIndex][i] = loc.get(i) == null
473          ? -1
474          : (serversToIndex.get(loc.get(i).getAddress()) == null
475            ? -1
476            : serversToIndex.get(loc.get(i).getAddress()));
477      }
478    }
479
480    int numReplicas = region.getReplicaId() + 1;
481    if (numReplicas > maxReplicas) {
482      maxReplicas = numReplicas;
483    }
484  }
485
486  /**
487   * Returns true iff a given server has less regions than the balanced amount
488   */
489  public boolean serverHasTooFewRegions(int server) {
490    int minLoad = this.numRegions / numServers;
491    int numRegions = getNumRegions(server);
492    return numRegions < minLoad;
493  }
494
495  /**
496   * Retrieves and lazily initializes a field storing the locality of every region/server
497   * combination
498   */
499  public float[][] getOrComputeRackLocalities() {
500    if (rackLocalities == null || regionsToMostLocalEntities == null) {
501      computeCachedLocalities();
502    }
503    return rackLocalities;
504  }
505
506  /**
507   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
508   * the locality
509   */
510  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
511    if (rackLocalities == null || regionsToMostLocalEntities == null) {
512      computeCachedLocalities();
513    }
514    return regionsToMostLocalEntities[type.ordinal()];
515  }
516
517  /**
518   * Looks up locality from cache of localities. Will create cache if it does not already exist.
519   */
520  public float getOrComputeLocality(int region, int entity,
521    BalancerClusterState.LocalityType type) {
522    switch (type) {
523      case SERVER:
524        return getLocalityOfRegion(region, entity);
525      case RACK:
526        return getOrComputeRackLocalities()[region][entity];
527      default:
528        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
529    }
530  }
531
532  /**
533   * Returns locality weighted by region size in MB. Will create locality cache if it does not
534   * already exist.
535   */
536  public double getOrComputeWeightedLocality(int region, int server,
537    BalancerClusterState.LocalityType type) {
538    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
539  }
540
541  /**
542   * Returns the size in MB from the most recent RegionLoad for region
543   */
544  public int getRegionSizeMB(int region) {
545    Deque<BalancerRegionLoad> load = regionLoads[region];
546    // This means regions have no actual data on disk
547    if (load == null) {
548      return 0;
549    }
550    return regionLoads[region].getLast().getStorefileSizeMB();
551  }
552
553  /**
554   * Computes and caches the locality for each region/rack combinations, as well as storing a
555   * mapping of region -> server and region -> rack such that server and rack have the highest
556   * locality for region
557   */
558  private void computeCachedLocalities() {
559    rackLocalities = new float[numRegions][numRacks];
560    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
561
562    // Compute localities and find most local server per region
563    for (int region = 0; region < numRegions; region++) {
564      int serverWithBestLocality = 0;
565      float bestLocalityForRegion = 0;
566      for (int server = 0; server < numServers; server++) {
567        // Aggregate per-rack locality
568        float locality = getLocalityOfRegion(region, server);
569        int rack = serverIndexToRackIndex[server];
570        int numServersInRack = serversPerRack[rack].length;
571        rackLocalities[region][rack] += locality / numServersInRack;
572
573        if (locality > bestLocalityForRegion) {
574          serverWithBestLocality = server;
575          bestLocalityForRegion = locality;
576        }
577      }
578      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
579
580      // Find most local rack per region
581      int rackWithBestLocality = 0;
582      float bestRackLocalityForRegion = 0.0f;
583      for (int rack = 0; rack < numRacks; rack++) {
584        float rackLocality = rackLocalities[region][rack];
585        if (rackLocality > bestRackLocalityForRegion) {
586          bestRackLocalityForRegion = rackLocality;
587          rackWithBestLocality = rack;
588        }
589      }
590      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
591    }
592
593  }
594
595  /**
596   * Returns the size of hFiles from the most recent RegionLoad for region
597   */
598  public int getTotalRegionHFileSizeMB(int region) {
599    Deque<BalancerRegionLoad> load = regionLoads[region];
600    if (load == null) {
601      // This means, that the region has no actual data on disk
602      return 0;
603    }
604    return regionLoads[region].getLast().getRegionSizeMB();
605  }
606
607  /**
608   * Returns the weighted cache ratio of a region on the given region server
609   */
610  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
611    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
612  }
613
614  /**
615   * Returns the amount by which a region is cached on a given region server. If the region is not
616   * currently hosted on the given region server, then find out if it was previously hosted there
617   * and return the old cache ratio.
618   */
619  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
620    float regionCacheRatio = 0.0f;
621
622    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
623    for (int regionIndex : regionsPerServer[regionServerIndex]) {
624      if (region != regionIndex) {
625        continue;
626      }
627
628      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
629
630      // The region is currently hosted on this region server. Get the region cache ratio for this
631      // region on this server
632      regionCacheRatio =
633        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
634
635      return regionCacheRatio;
636    }
637
638    // Region is not currently hosted on this server. Check if the region was cached on this
639    // server earlier. This can happen when the server was shutdown and the cache was persisted.
640    // Search using the region name and server name and not the index id and server id as these ids
641    // may change when a server is marked as dead or a new server is added.
642    String regionEncodedName = regions[region].getEncodedName();
643    ServerName serverName = servers[regionServerIndex];
644    if (
645      regionCacheRatioOnOldServerMap != null
646        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
647    ) {
648      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
649        regionCacheRatioOnOldServerMap.get(regionEncodedName);
650      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
651        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
652        if (LOG.isDebugEnabled()) {
653          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
654            serverName, regionCacheRatio);
655        }
656      }
657    }
658    return regionCacheRatio;
659  }
660
661  /**
662   * Populate the maps containing information about how much a region is cached on a region server.
663   */
664  private void computeRegionServerRegionCacheRatio() {
665    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
666    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
667
668    for (int region = 0; region < numRegions; region++) {
669      float bestRegionCacheRatio = 0.0f;
670      int serverWithBestRegionCacheRatio = 0;
671      for (int server = 0; server < numServers; server++) {
672        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
673        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
674          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
675          // cache ratio only if the cache ratio is greater than 0.
676          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
677          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
678        }
679        if (regionCacheRatio > bestRegionCacheRatio) {
680          serverWithBestRegionCacheRatio = server;
681          // If the server currently hosting the region has equal cache ratio to a historical
682          // server, consider the current server to keep hosting the region
683          bestRegionCacheRatio = regionCacheRatio;
684        } else if (
685          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
686        ) {
687          // If two servers have same region cache ratio, then the server currently hosting the
688          // region
689          // should retain the region
690          serverWithBestRegionCacheRatio = server;
691        }
692      }
693      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
694      Pair<Integer, Integer> regionServerPair =
695        new Pair<>(region, regionIndexToServerIndex[region]);
696      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
697      if (tempRegionCacheRatio > bestRegionCacheRatio) {
698        LOG.warn(
699          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
700            + "best region cache ratio {} on server {}",
701          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
702          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
703      }
704    }
705  }
706
707  protected float getOrComputeRegionCacheRatio(int region, int server) {
708    if (
709      regionServerIndexWithBestRegionCachedRatio == null
710        || regionIndexServerIndexRegionCachedRatio.isEmpty()
711    ) {
712      computeRegionServerRegionCacheRatio();
713    }
714
715    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
716    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
717      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
718      : 0.0f;
719  }
720
721  public int[] getOrComputeServerWithBestRegionCachedRatio() {
722    if (
723      regionServerIndexWithBestRegionCachedRatio == null
724        || regionIndexServerIndexRegionCachedRatio.isEmpty()
725    ) {
726      computeRegionServerRegionCacheRatio();
727    }
728    return regionServerIndexWithBestRegionCachedRatio;
729  }
730
731  /**
732   * Finds and return the latest reported cache ratio for the region on the RegionServer it's
733   * currently online.
734   */
735  float getObservedRegionCacheRatio(int region) {
736    Deque<BalancerRegionLoad> dq = regionLoads[region];
737    if (dq == null || dq.isEmpty()) {
738      return 0.0f;
739    }
740    return dq.getLast().getCurrentRegionCacheRatio();
741  }
742
743  /**
744   * Maps region index to rack index
745   */
746  public int getRackForRegion(int region) {
747    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
748  }
749
750  enum LocalityType {
751    SERVER,
752    RACK
753  }
754
755  public void doAction(BalanceAction action) {
756    switch (action.getType()) {
757      case NULL:
758        break;
759      case ASSIGN_REGION:
760        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
761        assert action instanceof AssignRegionAction : action.getClass();
762        AssignRegionAction ar = (AssignRegionAction) action;
763        regionsPerServer[ar.getServer()] =
764          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
765        regionMoved(ar.getRegion(), -1, ar.getServer());
766        break;
767      case MOVE_REGION:
768        assert action instanceof MoveRegionAction : action.getClass();
769        MoveRegionAction mra = (MoveRegionAction) action;
770        regionsPerServer[mra.getFromServer()] =
771          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
772        regionsPerServer[mra.getToServer()] =
773          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
774        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
775        break;
776      case SWAP_REGIONS:
777        assert action instanceof SwapRegionsAction : action.getClass();
778        SwapRegionsAction a = (SwapRegionsAction) action;
779        regionsPerServer[a.getFromServer()] =
780          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
781        regionsPerServer[a.getToServer()] =
782          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
783        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
784        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
785        break;
786      case MOVE_BATCH:
787        assert action instanceof MoveBatchAction : action.getClass();
788        MoveBatchAction mba = (MoveBatchAction) action;
789        for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
790          Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
791          regionsPerServer[serverIndex] =
792            removeRegions(regionsPerServer[serverIndex], regionsToRemove);
793        }
794        for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
795          Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
796          regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
797        }
798        for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
799          regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(),
800            moveRegionAction.getToServer());
801        }
802        break;
803      default:
804        throw new RuntimeException("Unknown action:" + action.getType());
805    }
806  }
807
808  /**
809   * Return true if the placement of region on server would lower the availability of the region in
810   * question
811   * @return true or false
812   */
813  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
814    if (!serversToIndex.containsKey(serverName.getAddress())) {
815      return false; // safeguard against race between cluster.servers and servers from LB method
816      // args
817    }
818    int server = serversToIndex.get(serverName.getAddress());
819    int region = regionsToIndex.get(regionInfo);
820
821    // Region replicas for same region should better assign to different servers
822    for (int i : regionsPerServer[server]) {
823      RegionInfo otherRegionInfo = regions[i];
824      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
825        return true;
826      }
827    }
828
829    int primary = regionIndexToPrimaryIndex[region];
830    if (primary == -1) {
831      return false;
832    }
833    // there is a subset relation for server < host < rack
834    // check server first
835    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
836    if (result != 0) {
837      return result > 0;
838    }
839
840    // check host
841    if (multiServersPerHost) {
842      result = checkLocationForPrimary(serverIndexToHostIndex[server],
843        colocatedReplicaCountsPerHost, primary);
844      if (result != 0) {
845        return result > 0;
846      }
847    }
848
849    // check rack
850    if (numRacks > 1) {
851      result = checkLocationForPrimary(serverIndexToRackIndex[server],
852        colocatedReplicaCountsPerRack, primary);
853      if (result != 0) {
854        return result > 0;
855      }
856    }
857    return false;
858  }
859
860  /**
861   * Common method for better solution check.
862   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
863   *                                          colocatedReplicaCountsPerRack
864   * @return 1 for better, -1 for no better, 0 for unknown
865   */
866  private int checkLocationForPrimary(int location,
867    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
868    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
869      // check for whether there are other Locations that we can place this region
870      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
871        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
872          return 1; // meaning there is a better Location
873        }
874      }
875      return -1; // there is not a better Location to place this
876    }
877    return 0;
878  }
879
880  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
881    if (!serversToIndex.containsKey(serverName.getAddress())) {
882      return;
883    }
884    int server = serversToIndex.get(serverName.getAddress());
885    int region = regionsToIndex.get(regionInfo);
886    doAction(new AssignRegionAction(region, server));
887  }
888
889  void regionMoved(int region, int oldServer, int newServer) {
890    regionIndexToServerIndex[region] = newServer;
891    if (initialRegionIndexToServerIndex[region] == newServer) {
892      numMovedRegions--; // region moved back to original location
893    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
894      numMovedRegions++; // region moved from original location
895    }
896    int tableIndex = regionIndexToTableIndex[region];
897    if (oldServer >= 0) {
898      numRegionsPerServerPerTable[tableIndex][oldServer]--;
899    }
900    numRegionsPerServerPerTable[tableIndex][newServer]++;
901
902    // update for servers
903    int primary = regionIndexToPrimaryIndex[region];
904    if (oldServer >= 0) {
905      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
906    }
907    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
908
909    // update for hosts
910    if (multiServersPerHost) {
911      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
912        oldServer, newServer, primary, region);
913    }
914
915    // update for racks
916    if (numRacks > 1) {
917      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
918        oldServer, newServer, primary, region);
919    }
920  }
921
922  /**
923   * Common method for per host and per Location region index updates when a region is moved.
924   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
925   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
926   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
927   *                                          colocatedReplicaCountsPerRack
928   */
929  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
930    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
931    int primary, int region) {
932    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
933    int newLocation = serverIndexToLocation[newServer];
934    if (newLocation != oldLocation) {
935      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
936      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
937      if (oldLocation >= 0) {
938        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
939        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
940      }
941    }
942
943  }
944
945  int[] removeRegion(int[] regions, int regionIndex) {
946    // TODO: this maybe costly. Consider using linked lists
947    int[] newRegions = new int[regions.length - 1];
948    int i = 0;
949    for (i = 0; i < regions.length; i++) {
950      if (regions[i] == regionIndex) {
951        break;
952      }
953      newRegions[i] = regions[i];
954    }
955    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
956    return newRegions;
957  }
958
959  int[] addRegion(int[] regions, int regionIndex) {
960    int[] newRegions = new int[regions.length + 1];
961    System.arraycopy(regions, 0, newRegions, 0, regions.length);
962    newRegions[newRegions.length - 1] = regionIndex;
963    return newRegions;
964  }
965
966  int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
967    // Calculate the size of the new regions array
968    int newSize = regions.length - regionIndicesToRemove.size();
969    if (newSize < 0) {
970      throw new IllegalStateException(
971        "Region indices mismatch: more regions to remove than in the regions array");
972    }
973
974    int[] newRegions = new int[newSize];
975    int newIndex = 0;
976
977    // Copy only the regions not in the removal set
978    for (int region : regions) {
979      if (!regionIndicesToRemove.contains(region)) {
980        newRegions[newIndex++] = region;
981      }
982    }
983
984    // If the newIndex is smaller than newSize, some regions were missing from the input array
985    if (newIndex != newSize) {
986      throw new IllegalStateException("Region indices mismatch: some regions in the removal "
987        + "set were not found in the regions array");
988    }
989
990    return newRegions;
991  }
992
993  int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
994    int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
995
996    // Copy the existing regions to the new array
997    System.arraycopy(regions, 0, newRegions, 0, regions.length);
998
999    // Add the new regions at the end of the array
1000    int newIndex = regions.length;
1001    for (int regionIndex : regionIndicesToAdd) {
1002      newRegions[newIndex++] = regionIndex;
1003    }
1004
1005    return newRegions;
1006  }
1007
1008  List<Integer> getShuffledServerIndices() {
1009    return shuffledServerIndicesSupplier.get();
1010  }
1011
1012  int[] addRegionSorted(int[] regions, int regionIndex) {
1013    int[] newRegions = new int[regions.length + 1];
1014    int i = 0;
1015    for (i = 0; i < regions.length; i++) { // find the index to insert
1016      if (regions[i] > regionIndex) {
1017        break;
1018      }
1019    }
1020    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
1021    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
1022    newRegions[i] = regionIndex;
1023
1024    return newRegions;
1025  }
1026
1027  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
1028    int i = 0;
1029    for (i = 0; i < regions.length; i++) {
1030      if (regions[i] == regionIndex) {
1031        regions[i] = newRegionIndex;
1032        break;
1033      }
1034    }
1035    return regions;
1036  }
1037
1038  void sortServersByRegionCount() {
1039    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
1040  }
1041
1042  int getNumRegions(int server) {
1043    return regionsPerServer[server].length;
1044  }
1045
1046  boolean contains(int[] arr, int val) {
1047    return Arrays.binarySearch(arr, val) >= 0;
1048  }
1049
1050  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
1051
1052  public Comparator<Integer> getNumRegionsComparator() {
1053    return numRegionsComparator;
1054  }
1055
1056  int getLowestLocalityRegionOnServer(int serverIndex) {
1057    if (regionFinder != null) {
1058      float lowestLocality = 1.0f;
1059      int lowestLocalityRegionIndex = -1;
1060      if (regionsPerServer[serverIndex].length == 0) {
1061        // No regions on that region server
1062        return -1;
1063      }
1064      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
1065        int regionIndex = regionsPerServer[serverIndex][j];
1066        HDFSBlocksDistribution distribution =
1067          regionFinder.getBlockDistribution(regions[regionIndex]);
1068        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
1069        // skip empty region
1070        if (distribution.getUniqueBlocksTotalWeight() == 0) {
1071          continue;
1072        }
1073        if (locality < lowestLocality) {
1074          lowestLocality = locality;
1075          lowestLocalityRegionIndex = j;
1076        }
1077      }
1078      if (lowestLocalityRegionIndex == -1) {
1079        return -1;
1080      }
1081      if (LOG.isTraceEnabled()) {
1082        LOG.trace("Lowest locality region is "
1083          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
1084            .getRegionNameAsString()
1085          + " with locality " + lowestLocality + " and its region server contains "
1086          + regionsPerServer[serverIndex].length + " regions");
1087      }
1088      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
1089    } else {
1090      return -1;
1091    }
1092  }
1093
1094  float getLocalityOfRegion(int region, int server) {
1095    if (regionFinder != null) {
1096      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
1097      return distribution.getBlockLocalityIndex(servers[server].getHostname());
1098    } else {
1099      return 0f;
1100    }
1101  }
1102
1103  void setNumRegions(int numRegions) {
1104    this.numRegions = numRegions;
1105  }
1106
1107  void setNumMovedRegions(int numMovedRegions) {
1108    this.numMovedRegions = numMovedRegions;
1109  }
1110
1111  public int getMaxReplicas() {
1112    return maxReplicas;
1113  }
1114
1115  void setStopRequestedAt(long stopRequestedAt) {
1116    this.stopRequestedAt = stopRequestedAt;
1117  }
1118
1119  boolean isStopRequested() {
1120    return EnvironmentEdgeManager.currentTime() > stopRequestedAt;
1121  }
1122
1123  Deque<BalancerRegionLoad>[] getRegionLoads() {
1124    return regionLoads;
1125  }
1126
1127  @Override
1128  public String toString() {
1129    StringBuilder desc = new StringBuilder("Cluster={servers=[");
1130    for (ServerName sn : servers) {
1131      desc.append(sn.getAddress().toString()).append(", ");
1132    }
1133    desc.append("], serverIndicesSortedByRegionCount=")
1134      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
1135      .append(Arrays.deepToString(regionsPerServer));
1136
1137    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1138      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
1139      .append('}');
1140    return desc.toString();
1141  }
1142}