001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Supplier;
032import org.agrona.collections.Hashing;
033import org.agrona.collections.Int2IntCounterMap;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionReplicaUtil;
038import org.apache.hadoop.hbase.master.RackManager;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
047
048/**
049 * An efficient array based implementation similar to ClusterState for keeping the status of the
050 * cluster in terms of region assignment and distribution. LoadBalancers, such as
051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
052 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
053 * <p/>
054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
055 * topology in terms of server names, hostnames and racks.
056 */
057@InterfaceAudience.Private
058class BalancerClusterState {
059
060  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
061
062  ServerName[] servers;
063  // ServerName uniquely identifies a region server. multiple RS can run on the same host
064  String[] hosts;
065  String[] racks;
066  boolean multiServersPerHost = false; // whether or not any host has more than one server
067
068  ArrayList<String> tables;
069  RegionInfo[] regions;
070  Deque<BalancerRegionLoad>[] regionLoads;
071  private RegionHDFSBlockLocationFinder regionFinder;
072
073  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
074
075  int[] serverIndexToHostIndex; // serverIndex -> host index
076  int[] serverIndexToRackIndex; // serverIndex -> rack index
077
078  int[][] regionsPerServer; // serverIndex -> region list
079  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
080  int[][] regionsPerHost; // hostIndex -> list of regions
081  int[][] regionsPerRack; // rackIndex -> region list
082  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
083  // replicas by primary region index
084  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
085  // primary region index
086  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
087  // primary region index
088
089  int[][] serversPerHost; // hostIndex -> list of server indexes
090  int[][] serversPerRack; // rackIndex -> list of server indexes
091  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
092  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
093  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
094  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
095  int[] numRegionsPerTable; // tableIndex -> region count
096  double[] meanRegionsPerTable; // mean region count per table
097  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
098  boolean hasRegionReplicas = false; // whether there is regions with replicas
099
100  Integer[] serverIndicesSortedByRegionCount;
101  Integer[] serverIndicesSortedByLocality;
102
103  Map<Address, Integer> serversToIndex;
104  Map<String, Integer> hostsToIndex;
105  Map<String, Integer> racksToIndex;
106  Map<String, Integer> tablesToIndex;
107  Map<RegionInfo, Integer> regionsToIndex;
108  float[] localityPerServer;
109
110  int numServers;
111  int numHosts;
112  int numRacks;
113  int numTables;
114  int numRegions;
115  int maxReplicas = 1;
116
117  int numMovedRegions = 0; // num moved regions from the initial configuration
118  Map<ServerName, List<RegionInfo>> clusterState;
119
120  private final RackManager rackManager;
121  // Maps region -> rackIndex -> locality of region on rack
122  private float[][] rackLocalities;
123  // Maps localityType -> region -> [server|rack]Index with highest locality
124  private int[][] regionsToMostLocalEntities;
125  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
126  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
127  // Maps regionIndex -> serverIndex with best region cache ratio
128  private int[] regionServerIndexWithBestRegionCachedRatio;
129  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
130  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
131  // cache free space available on each server, aligned to the "servers" array indices;
132  long[] serverBlockCacheFreeSize;
133
134  private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
135    Suppliers.memoizeWithExpiration(() -> {
136      Collection<Integer> serverIndices = serversToIndex.values();
137      List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
138      Collections.shuffle(shuffledServerIndices);
139      return shuffledServerIndices;
140    }, 5, TimeUnit.SECONDS);
141  private long stopRequestedAt = Long.MAX_VALUE;
142
143  static class DefaultRackManager extends RackManager {
144    @Override
145    public String getRack(ServerName server) {
146      return UNKNOWN_RACK;
147    }
148  }
149
150  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
151    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
152    RackManager rackManager) {
153    this(null, clusterState, loads, regionFinder, rackManager, null, null);
154  }
155
156  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
157    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
158    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
159    Map<ServerName, Long> serverBlockCacheFreeByServer) {
160    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio,
161      serverBlockCacheFreeByServer);
162  }
163
164  @SuppressWarnings("unchecked")
165  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
166    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
167    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
168    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
169    Map<ServerName, Long> serverBlockCacheFreeByServer) {
170    if (unassignedRegions == null) {
171      unassignedRegions = Collections.emptyList();
172    }
173
174    serversToIndex = new HashMap<>();
175    hostsToIndex = new HashMap<>();
176    racksToIndex = new HashMap<>();
177    tablesToIndex = new HashMap<>();
178
179    // TODO: We should get the list of tables from master
180    tables = new ArrayList<>();
181    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
182
183    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
184
185    numRegions = 0;
186
187    List<List<Integer>> serversPerHostList = new ArrayList<>();
188    List<List<Integer>> serversPerRackList = new ArrayList<>();
189    this.clusterState = clusterState;
190    this.regionFinder = regionFinder;
191
192    // Use servername and port as there can be dead servers in this list. We want everything with
193    // a matching hostname and port to have the same index.
194    for (ServerName sn : clusterState.keySet()) {
195      if (sn == null) {
196        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
197          + "skipping; unassigned regions?");
198        if (LOG.isTraceEnabled()) {
199          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
200        }
201        continue;
202      }
203      if (serversToIndex.get(sn.getAddress()) == null) {
204        serversToIndex.put(sn.getAddress(), numServers++);
205      }
206      if (!hostsToIndex.containsKey(sn.getHostname())) {
207        hostsToIndex.put(sn.getHostname(), numHosts++);
208        serversPerHostList.add(new ArrayList<>(1));
209      }
210
211      int serverIndex = serversToIndex.get(sn.getAddress());
212      int hostIndex = hostsToIndex.get(sn.getHostname());
213      serversPerHostList.get(hostIndex).add(serverIndex);
214
215      String rack = this.rackManager.getRack(sn);
216
217      if (!racksToIndex.containsKey(rack)) {
218        racksToIndex.put(rack, numRacks++);
219        serversPerRackList.add(new ArrayList<>());
220      }
221      int rackIndex = racksToIndex.get(rack);
222      serversPerRackList.get(rackIndex).add(serverIndex);
223    }
224
225    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
226    // Count how many regions there are.
227    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
228      numRegions += entry.getValue().size();
229    }
230    numRegions += unassignedRegions.size();
231
232    regionsToIndex = new HashMap<>(numRegions);
233    servers = new ServerName[numServers];
234    serversPerHost = new int[numHosts][];
235    serversPerRack = new int[numRacks][];
236    regions = new RegionInfo[numRegions];
237    regionIndexToServerIndex = new int[numRegions];
238    initialRegionIndexToServerIndex = new int[numRegions];
239    regionIndexToTableIndex = new int[numRegions];
240    regionIndexToPrimaryIndex = new int[numRegions];
241    regionLoads = new Deque[numRegions];
242
243    regionLocations = new int[numRegions][];
244    serverIndicesSortedByRegionCount = new Integer[numServers];
245    serverIndicesSortedByLocality = new Integer[numServers];
246    localityPerServer = new float[numServers];
247
248    serverIndexToHostIndex = new int[numServers];
249    serverIndexToRackIndex = new int[numServers];
250    regionsPerServer = new int[numServers][];
251    serverIndexToRegionsOffset = new int[numServers];
252    regionsPerHost = new int[numHosts][];
253    regionsPerRack = new int[numRacks][];
254    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
255    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
256    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
257
258    int regionIndex = 0, regionPerServerIndex = 0;
259
260    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
261      if (entry.getKey() == null) {
262        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
263        continue;
264      }
265      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
266
267      // keep the servername if this is the first server name for this hostname
268      // or this servername has the newest startcode.
269      if (
270        servers[serverIndex] == null
271          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
272      ) {
273        servers[serverIndex] = entry.getKey();
274      }
275
276      if (regionsPerServer[serverIndex] != null) {
277        // there is another server with the same hostAndPort in ClusterState.
278        // allocate the array for the total size
279        regionsPerServer[serverIndex] =
280          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
281      } else {
282        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
283      }
284      colocatedReplicaCountsPerServer[serverIndex] =
285        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
286      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
287      serverIndicesSortedByLocality[serverIndex] = serverIndex;
288    }
289
290    hosts = new String[numHosts];
291    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
292      hosts[entry.getValue()] = entry.getKey();
293    }
294    racks = new String[numRacks];
295    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
296      racks[entry.getValue()] = entry.getKey();
297    }
298
299    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
300      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
301      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
302
303      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
304      serverIndexToHostIndex[serverIndex] = hostIndex;
305
306      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
307      serverIndexToRackIndex[serverIndex] = rackIndex;
308
309      for (RegionInfo region : entry.getValue()) {
310        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
311        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
312        regionIndex++;
313      }
314      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
315    }
316
317    for (RegionInfo region : unassignedRegions) {
318      registerRegion(region, regionIndex, -1, loads, regionFinder);
319      regionIndex++;
320    }
321
322    if (LOG.isTraceEnabled()) {
323      for (int i = 0; i < numServers; i++) {
324        LOG.trace("server {} has {} regions", i, regionsPerServer[i].length);
325      }
326    }
327    for (int i = 0; i < serversPerHostList.size(); i++) {
328      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
329      for (int j = 0; j < serversPerHost[i].length; j++) {
330        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
331        LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i);
332      }
333      if (serversPerHost[i].length > 1) {
334        multiServersPerHost = true;
335      }
336    }
337
338    for (int i = 0; i < serversPerRackList.size(); i++) {
339      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
340      for (int j = 0; j < serversPerRack[i].length; j++) {
341        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
342        LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
343      }
344    }
345
346    numTables = tables.size();
347    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
348      numRacks);
349    numRegionsPerServerPerTable = new int[numTables][numServers];
350    numRegionsPerTable = new int[numTables];
351
352    for (int i = 0; i < numTables; i++) {
353      for (int j = 0; j < numServers; j++) {
354        numRegionsPerServerPerTable[i][j] = 0;
355      }
356    }
357
358    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
359      if (regionIndexToServerIndex[i] >= 0) {
360        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
361        numRegionsPerTable[regionIndexToTableIndex[i]]++;
362      }
363    }
364
365    // Avoid repeated computation for planning
366    meanRegionsPerTable = new double[numTables];
367
368    for (int i = 0; i < numTables; i++) {
369      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
370    }
371
372    for (int i = 0; i < regions.length; i++) {
373      RegionInfo info = regions[i];
374      if (RegionReplicaUtil.isDefaultReplica(info)) {
375        regionIndexToPrimaryIndex[i] = i;
376      } else {
377        hasRegionReplicas = true;
378        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
379        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
380      }
381    }
382
383    for (int i = 0; i < regionsPerServer.length; i++) {
384      colocatedReplicaCountsPerServer[i] =
385        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
386      for (int j = 0; j < regionsPerServer[i].length; j++) {
387        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
388        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
389      }
390    }
391    // compute regionsPerHost
392    if (multiServersPerHost) {
393      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
394        serversPerHost);
395    }
396
397    // compute regionsPerRack
398    if (numRacks > 1) {
399      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
400        serversPerRack);
401    }
402
403    this.serverBlockCacheFreeSize = new long[numServers];
404    if (serverBlockCacheFreeByServer != null) {
405      for (int i = 0; i < numServers; i++) {
406        ServerName sn = servers[i];
407        this.serverBlockCacheFreeSize[i] =
408          sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L);
409      }
410    }
411  }
412
413  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
414    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
415    for (int i = 0; i < serversPerLocation.length; i++) {
416      int numRegionsPerLocation = 0;
417      for (int j = 0; j < serversPerLocation[i].length; j++) {
418        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
419      }
420      regionsPerLocation[i] = new int[numRegionsPerLocation];
421      colocatedReplicaCountsPerLocation[i] =
422        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
423    }
424
425    for (int i = 0; i < serversPerLocation.length; i++) {
426      int numRegionPerLocationIndex = 0;
427      for (int j = 0; j < serversPerLocation[i].length; j++) {
428        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
429          int region = regionsPerServer[serversPerLocation[i][j]][k];
430          regionsPerLocation[i][numRegionPerLocationIndex] = region;
431          int primaryIndex = regionIndexToPrimaryIndex[region];
432          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
433          numRegionPerLocationIndex++;
434        }
435      }
436    }
437
438  }
439
440  /** Helper for Cluster constructor to handle a region */
441  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
442    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
443    String tableName = region.getTable().getNameAsString();
444    if (!tablesToIndex.containsKey(tableName)) {
445      tables.add(tableName);
446      tablesToIndex.put(tableName, tablesToIndex.size());
447    }
448    int tableIndex = tablesToIndex.get(tableName);
449
450    regionsToIndex.put(region, regionIndex);
451    regions[regionIndex] = region;
452    regionIndexToServerIndex[regionIndex] = serverIndex;
453    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
454    regionIndexToTableIndex[regionIndex] = tableIndex;
455
456    // region load
457    if (loads != null) {
458      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
459      // That could have failed if the RegionLoad is using the other regionName
460      if (rl == null) {
461        // Try getting the region load using encoded name.
462        rl = loads.get(region.getEncodedName());
463      }
464      regionLoads[regionIndex] = rl;
465    }
466
467    if (regionFinder != null) {
468      // region location
469      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
470      regionLocations[regionIndex] = new int[loc.size()];
471      for (int i = 0; i < loc.size(); i++) {
472        regionLocations[regionIndex][i] = loc.get(i) == null
473          ? -1
474          : (serversToIndex.get(loc.get(i).getAddress()) == null
475            ? -1
476            : serversToIndex.get(loc.get(i).getAddress()));
477      }
478    }
479
480    int numReplicas = region.getReplicaId() + 1;
481    if (numReplicas > maxReplicas) {
482      maxReplicas = numReplicas;
483    }
484  }
485
486  /**
487   * Returns true iff a given server has less regions than the balanced amount
488   */
489  public boolean serverHasTooFewRegions(int server) {
490    int minLoad = this.numRegions / numServers;
491    int numRegions = getNumRegions(server);
492    return numRegions < minLoad;
493  }
494
495  /**
496   * Retrieves and lazily initializes a field storing the locality of every region/server
497   * combination
498   */
499  public float[][] getOrComputeRackLocalities() {
500    if (rackLocalities == null || regionsToMostLocalEntities == null) {
501      computeCachedLocalities();
502    }
503    return rackLocalities;
504  }
505
506  /**
507   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
508   * the locality
509   */
510  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
511    if (rackLocalities == null || regionsToMostLocalEntities == null) {
512      computeCachedLocalities();
513    }
514    return regionsToMostLocalEntities[type.ordinal()];
515  }
516
517  /**
518   * Looks up locality from cache of localities. Will create cache if it does not already exist.
519   */
520  public float getOrComputeLocality(int region, int entity,
521    BalancerClusterState.LocalityType type) {
522    switch (type) {
523      case SERVER:
524        return getLocalityOfRegion(region, entity);
525      case RACK:
526        return getOrComputeRackLocalities()[region][entity];
527      default:
528        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
529    }
530  }
531
532  /**
533   * Returns locality weighted by region size in MB. Will create locality cache if it does not
534   * already exist.
535   */
536  public double getOrComputeWeightedLocality(int region, int server,
537    BalancerClusterState.LocalityType type) {
538    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
539  }
540
541  /**
542   * Returns the size in MB from the most recent RegionLoad for region
543   */
544  public int getRegionSizeMB(int region) {
545    Deque<BalancerRegionLoad> load = regionLoads[region];
546    // This means regions have no actual data on disk
547    if (load == null) {
548      return 0;
549    }
550    return load.getLast().getStorefileSizeMB();
551  }
552
553  /**
554   * Finds and return the sum of latest reported cache ratio and cold data ratio for the region on
555   * the RegionServer it's currently online.
556   */
557  float getSumRegionCacheAndColdDataRatio(int region) {
558    Deque<BalancerRegionLoad> dq = regionLoads[region];
559    if (dq == null || dq.isEmpty()) {
560      return 0.0f;
561    }
562    BalancerRegionLoad load = dq.getLast();
563    return load.getCurrentRegionCacheRatio() + load.getRegionColdDataRatio();
564  }
565
566  int getRegionSizeMinusColdDataMB(int region) {
567    Deque<BalancerRegionLoad> dq = regionLoads[region];
568    if (dq == null || dq.isEmpty()) {
569      return 0;
570    }
571    BalancerRegionLoad load = dq.getLast();
572    return load.getRegionSizeMB() - (int) (load.getRegionSizeMB() * load.getRegionColdDataRatio());
573  }
574
575  /**
576   * Computes and caches the locality for each region/rack combinations, as well as storing a
577   * mapping of region -> server and region -> rack such that server and rack have the highest
578   * locality for region
579   */
580  private void computeCachedLocalities() {
581    rackLocalities = new float[numRegions][numRacks];
582    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
583
584    // Compute localities and find most local server per region
585    for (int region = 0; region < numRegions; region++) {
586      int serverWithBestLocality = 0;
587      float bestLocalityForRegion = 0;
588      for (int server = 0; server < numServers; server++) {
589        // Aggregate per-rack locality
590        float locality = getLocalityOfRegion(region, server);
591        int rack = serverIndexToRackIndex[server];
592        int numServersInRack = serversPerRack[rack].length;
593        rackLocalities[region][rack] += locality / numServersInRack;
594
595        if (locality > bestLocalityForRegion) {
596          serverWithBestLocality = server;
597          bestLocalityForRegion = locality;
598        }
599      }
600      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
601
602      // Find most local rack per region
603      int rackWithBestLocality = 0;
604      float bestRackLocalityForRegion = 0.0f;
605      for (int rack = 0; rack < numRacks; rack++) {
606        float rackLocality = rackLocalities[region][rack];
607        if (rackLocality > bestRackLocalityForRegion) {
608          bestRackLocalityForRegion = rackLocality;
609          rackWithBestLocality = rack;
610        }
611      }
612      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
613    }
614
615  }
616
617  /**
618   * Returns the weighted cache ratio of a region on the given region server
619   */
620  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
621    return getRegionSizeMinusColdDataMB(region) * getOrComputeRegionCacheRatio(region, server);
622  }
623
624  /**
625   * Returns the amount by which a region is cached on a given region server. If the region is not
626   * currently hosted on the given region server, then find out if it was previously hosted there
627   * and return the old cache ratio.
628   */
629  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
630    float regionCacheRatio = 0.0f;
631
632    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
633    for (int regionIndex : regionsPerServer[regionServerIndex]) {
634      if (region != regionIndex) {
635        continue;
636      }
637
638      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
639
640      // The region is currently hosted on this region server. Get the region cache ratio for this
641      // region on this server
642      regionCacheRatio =
643        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
644
645      return regionCacheRatio;
646    }
647
648    // Region is not currently hosted on this server. Check if the region was cached on this
649    // server earlier. This can happen when the server was shutdown and the cache was persisted.
650    // Search using the region name and server name and not the index id and server id as these ids
651    // may change when a server is marked as dead or a new server is added.
652    String regionEncodedName = regions[region].getEncodedName();
653    ServerName serverName = servers[regionServerIndex];
654    if (
655      regionCacheRatioOnOldServerMap != null
656        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
657    ) {
658      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
659        regionCacheRatioOnOldServerMap.get(regionEncodedName);
660      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
661        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
662        if (LOG.isDebugEnabled()) {
663          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
664            serverName, regionCacheRatio);
665        }
666      }
667    }
668    return regionCacheRatio;
669  }
670
671  /**
672   * Populate the maps containing information about how much a region is cached on a region server.
673   */
674  private void computeRegionServerRegionCacheRatio() {
675    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
676    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
677
678    for (int region = 0; region < numRegions; region++) {
679      float bestRegionCacheRatio = 0.0f;
680      int serverWithBestRegionCacheRatio = 0;
681      for (int server = 0; server < numServers; server++) {
682        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
683        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
684          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
685          // cache ratio only if the cache ratio is greater than 0.
686          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
687          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
688        }
689        if (regionCacheRatio > bestRegionCacheRatio) {
690          serverWithBestRegionCacheRatio = server;
691          // If the server currently hosting the region has equal cache ratio to a historical
692          // server, consider the current server to keep hosting the region
693          bestRegionCacheRatio = regionCacheRatio;
694        } else if (
695          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
696        ) {
697          // If two servers have same region cache ratio, then the server currently hosting the
698          // region
699          // should retain the region
700          serverWithBestRegionCacheRatio = server;
701        }
702      }
703      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
704      Pair<Integer, Integer> regionServerPair =
705        new Pair<>(region, regionIndexToServerIndex[region]);
706      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
707      if (tempRegionCacheRatio > bestRegionCacheRatio) {
708        LOG.warn(
709          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
710            + "best region cache ratio {} on server {}",
711          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
712          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
713      }
714    }
715  }
716
717  protected float getOrComputeRegionCacheRatio(int region, int server) {
718    if (
719      regionServerIndexWithBestRegionCachedRatio == null
720        || regionIndexServerIndexRegionCachedRatio.isEmpty()
721    ) {
722      computeRegionServerRegionCacheRatio();
723    }
724
725    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
726    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
727      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
728      : 0.0f;
729  }
730
731  public int[] getOrComputeServerWithBestRegionCachedRatio() {
732    if (
733      regionServerIndexWithBestRegionCachedRatio == null
734        || regionIndexServerIndexRegionCachedRatio.isEmpty()
735    ) {
736      computeRegionServerRegionCacheRatio();
737    }
738    return regionServerIndexWithBestRegionCachedRatio;
739  }
740
741  /**
742   * Finds and return the latest reported cache ratio for the region on the RegionServer it's
743   * currently online.
744   */
745  float getObservedRegionCacheRatio(int region) {
746    Deque<BalancerRegionLoad> dq = regionLoads[region];
747    if (dq == null || dq.isEmpty()) {
748      return 0.0f;
749    }
750    return dq.getLast().getCurrentRegionCacheRatio();
751  }
752
753  /**
754   * Maps region index to rack index
755   */
756  public int getRackForRegion(int region) {
757    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
758  }
759
760  enum LocalityType {
761    SERVER,
762    RACK
763  }
764
765  public void doAction(BalanceAction action) {
766    switch (action.getType()) {
767      case NULL:
768        break;
769      case ASSIGN_REGION:
770        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
771        assert action instanceof AssignRegionAction : action.getClass();
772        AssignRegionAction ar = (AssignRegionAction) action;
773        regionsPerServer[ar.getServer()] =
774          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
775        regionMoved(ar.getRegion(), -1, ar.getServer());
776        break;
777      case MOVE_REGION:
778        assert action instanceof MoveRegionAction : action.getClass();
779        MoveRegionAction mra = (MoveRegionAction) action;
780        regionsPerServer[mra.getFromServer()] =
781          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
782        regionsPerServer[mra.getToServer()] =
783          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
784        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
785        break;
786      case SWAP_REGIONS:
787        assert action instanceof SwapRegionsAction : action.getClass();
788        SwapRegionsAction a = (SwapRegionsAction) action;
789        regionsPerServer[a.getFromServer()] =
790          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
791        regionsPerServer[a.getToServer()] =
792          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
793        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
794        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
795        break;
796      case MOVE_BATCH:
797        assert action instanceof MoveBatchAction : action.getClass();
798        MoveBatchAction mba = (MoveBatchAction) action;
799        for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
800          Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
801          regionsPerServer[serverIndex] =
802            removeRegions(regionsPerServer[serverIndex], regionsToRemove);
803        }
804        for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
805          Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
806          regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
807        }
808        for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
809          regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(),
810            moveRegionAction.getToServer());
811        }
812        break;
813      default:
814        throw new RuntimeException("Unknown action:" + action.getType());
815    }
816  }
817
818  /**
819   * Return true if the placement of region on server would lower the availability of the region in
820   * question
821   * @return true or false
822   */
823  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
824    if (!serversToIndex.containsKey(serverName.getAddress())) {
825      return false; // safeguard against race between cluster.servers and servers from LB method
826      // args
827    }
828    int server = serversToIndex.get(serverName.getAddress());
829    int region = regionsToIndex.get(regionInfo);
830
831    // Region replicas for same region should better assign to different servers
832    for (int i : regionsPerServer[server]) {
833      RegionInfo otherRegionInfo = regions[i];
834      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
835        return true;
836      }
837    }
838
839    int primary = regionIndexToPrimaryIndex[region];
840    if (primary == -1) {
841      return false;
842    }
843    // there is a subset relation for server < host < rack
844    // check server first
845    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
846    if (result != 0) {
847      return result > 0;
848    }
849
850    // check host
851    if (multiServersPerHost) {
852      result = checkLocationForPrimary(serverIndexToHostIndex[server],
853        colocatedReplicaCountsPerHost, primary);
854      if (result != 0) {
855        return result > 0;
856      }
857    }
858
859    // check rack
860    if (numRacks > 1) {
861      result = checkLocationForPrimary(serverIndexToRackIndex[server],
862        colocatedReplicaCountsPerRack, primary);
863      if (result != 0) {
864        return result > 0;
865      }
866    }
867    return false;
868  }
869
870  /**
871   * Common method for better solution check.
872   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
873   *                                          colocatedReplicaCountsPerRack
874   * @return 1 for better, -1 for no better, 0 for unknown
875   */
876  private int checkLocationForPrimary(int location,
877    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
878    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
879      // check for whether there are other Locations that we can place this region
880      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
881        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
882          return 1; // meaning there is a better Location
883        }
884      }
885      return -1; // there is not a better Location to place this
886    }
887    return 0;
888  }
889
890  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
891    if (!serversToIndex.containsKey(serverName.getAddress())) {
892      return;
893    }
894    int server = serversToIndex.get(serverName.getAddress());
895    int region = regionsToIndex.get(regionInfo);
896    doAction(new AssignRegionAction(region, server));
897  }
898
899  void regionMoved(int region, int oldServer, int newServer) {
900    regionIndexToServerIndex[region] = newServer;
901    if (initialRegionIndexToServerIndex[region] == newServer) {
902      numMovedRegions--; // region moved back to original location
903    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
904      numMovedRegions++; // region moved from original location
905    }
906    int tableIndex = regionIndexToTableIndex[region];
907    if (oldServer >= 0) {
908      numRegionsPerServerPerTable[tableIndex][oldServer]--;
909    }
910    numRegionsPerServerPerTable[tableIndex][newServer]++;
911
912    // update for servers
913    int primary = regionIndexToPrimaryIndex[region];
914    if (oldServer >= 0) {
915      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
916    }
917    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
918
919    // update for hosts
920    if (multiServersPerHost) {
921      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
922        oldServer, newServer, primary, region);
923    }
924
925    // update for racks
926    if (numRacks > 1) {
927      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
928        oldServer, newServer, primary, region);
929    }
930  }
931
932  /**
933   * Common method for per host and per Location region index updates when a region is moved.
934   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
935   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
936   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
937   *                                          colocatedReplicaCountsPerRack
938   */
939  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
940    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
941    int primary, int region) {
942    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
943    int newLocation = serverIndexToLocation[newServer];
944    if (newLocation != oldLocation) {
945      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
946      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
947      if (oldLocation >= 0) {
948        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
949        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
950      }
951    }
952
953  }
954
955  int[] removeRegion(int[] regions, int regionIndex) {
956    // TODO: this maybe costly. Consider using linked lists
957    int[] newRegions = new int[regions.length - 1];
958    int i = 0;
959    for (i = 0; i < regions.length; i++) {
960      if (regions[i] == regionIndex) {
961        break;
962      }
963      newRegions[i] = regions[i];
964    }
965    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
966    return newRegions;
967  }
968
969  int[] addRegion(int[] regions, int regionIndex) {
970    int[] newRegions = new int[regions.length + 1];
971    System.arraycopy(regions, 0, newRegions, 0, regions.length);
972    newRegions[newRegions.length - 1] = regionIndex;
973    return newRegions;
974  }
975
976  int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
977    // Calculate the size of the new regions array
978    int newSize = regions.length - regionIndicesToRemove.size();
979    if (newSize < 0) {
980      throw new IllegalStateException(
981        "Region indices mismatch: more regions to remove than in the regions array");
982    }
983
984    int[] newRegions = new int[newSize];
985    int newIndex = 0;
986
987    // Copy only the regions not in the removal set
988    for (int region : regions) {
989      if (!regionIndicesToRemove.contains(region)) {
990        newRegions[newIndex++] = region;
991      }
992    }
993
994    // If the newIndex is smaller than newSize, some regions were missing from the input array
995    if (newIndex != newSize) {
996      throw new IllegalStateException("Region indices mismatch: some regions in the removal "
997        + "set were not found in the regions array");
998    }
999
1000    return newRegions;
1001  }
1002
1003  int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
1004    int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
1005
1006    // Copy the existing regions to the new array
1007    System.arraycopy(regions, 0, newRegions, 0, regions.length);
1008
1009    // Add the new regions at the end of the array
1010    int newIndex = regions.length;
1011    for (int regionIndex : regionIndicesToAdd) {
1012      newRegions[newIndex++] = regionIndex;
1013    }
1014
1015    return newRegions;
1016  }
1017
1018  List<Integer> getShuffledServerIndices() {
1019    return shuffledServerIndicesSupplier.get();
1020  }
1021
1022  int[] addRegionSorted(int[] regions, int regionIndex) {
1023    int[] newRegions = new int[regions.length + 1];
1024    int i = 0;
1025    for (i = 0; i < regions.length; i++) { // find the index to insert
1026      if (regions[i] > regionIndex) {
1027        break;
1028      }
1029    }
1030    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
1031    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
1032    newRegions[i] = regionIndex;
1033
1034    return newRegions;
1035  }
1036
1037  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
1038    int i = 0;
1039    for (i = 0; i < regions.length; i++) {
1040      if (regions[i] == regionIndex) {
1041        regions[i] = newRegionIndex;
1042        break;
1043      }
1044    }
1045    return regions;
1046  }
1047
1048  void sortServersByRegionCount() {
1049    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
1050  }
1051
1052  int getNumRegions(int server) {
1053    return regionsPerServer[server].length;
1054  }
1055
1056  boolean contains(int[] arr, int val) {
1057    return Arrays.binarySearch(arr, val) >= 0;
1058  }
1059
1060  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
1061
1062  public Comparator<Integer> getNumRegionsComparator() {
1063    return numRegionsComparator;
1064  }
1065
1066  int getLowestLocalityRegionOnServer(int serverIndex) {
1067    if (regionFinder != null) {
1068      float lowestLocality = 1.0f;
1069      int lowestLocalityRegionIndex = -1;
1070      if (regionsPerServer[serverIndex].length == 0) {
1071        // No regions on that region server
1072        return -1;
1073      }
1074      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
1075        int regionIndex = regionsPerServer[serverIndex][j];
1076        HDFSBlocksDistribution distribution =
1077          regionFinder.getBlockDistribution(regions[regionIndex]);
1078        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
1079        // skip empty region
1080        if (distribution.getUniqueBlocksTotalWeight() == 0) {
1081          continue;
1082        }
1083        if (locality < lowestLocality) {
1084          lowestLocality = locality;
1085          lowestLocalityRegionIndex = j;
1086        }
1087      }
1088      if (lowestLocalityRegionIndex == -1) {
1089        return -1;
1090      }
1091      if (LOG.isTraceEnabled()) {
1092        LOG.trace("Lowest locality region is "
1093          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
1094            .getRegionNameAsString()
1095          + " with locality " + lowestLocality + " and its region server contains "
1096          + regionsPerServer[serverIndex].length + " regions");
1097      }
1098      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
1099    } else {
1100      return -1;
1101    }
1102  }
1103
1104  float getLocalityOfRegion(int region, int server) {
1105    if (regionFinder != null) {
1106      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
1107      return distribution.getBlockLocalityIndex(servers[server].getHostname());
1108    } else {
1109      return 0f;
1110    }
1111  }
1112
1113  void setNumRegions(int numRegions) {
1114    this.numRegions = numRegions;
1115  }
1116
1117  void setNumMovedRegions(int numMovedRegions) {
1118    this.numMovedRegions = numMovedRegions;
1119  }
1120
1121  public int getMaxReplicas() {
1122    return maxReplicas;
1123  }
1124
1125  void setStopRequestedAt(long stopRequestedAt) {
1126    this.stopRequestedAt = stopRequestedAt;
1127  }
1128
1129  boolean isStopRequested() {
1130    return EnvironmentEdgeManager.currentTime() > stopRequestedAt;
1131  }
1132
1133  Deque<BalancerRegionLoad>[] getRegionLoads() {
1134    return regionLoads;
1135  }
1136
1137  @Override
1138  public String toString() {
1139    StringBuilder desc = new StringBuilder("Cluster={servers=[");
1140    for (ServerName sn : servers) {
1141      desc.append(sn.getAddress().toString()).append(", ");
1142    }
1143    desc.append("], serverIndicesSortedByRegionCount=")
1144      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
1145      .append(Arrays.deepToString(regionsPerServer));
1146
1147    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1148      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
1149      .append('}');
1150    return desc.toString();
1151  }
1152}