001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import org.agrona.collections.Hashing;
030import org.agrona.collections.Int2IntCounterMap;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.master.RackManager;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An efficient array based implementation similar to ClusterState for keeping the status of the
044 * cluster in terms of region assignment and distribution. LoadBalancers, such as
045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
046 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
047 * <p/>
048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
049 * topology in terms of server names, hostnames and racks.
050 */
051@InterfaceAudience.Private
052class BalancerClusterState {
053
054  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
055
056  ServerName[] servers;
057  // ServerName uniquely identifies a region server. multiple RS can run on the same host
058  String[] hosts;
059  String[] racks;
060  boolean multiServersPerHost = false; // whether or not any host has more than one server
061
062  ArrayList<String> tables;
063  RegionInfo[] regions;
064  Deque<BalancerRegionLoad>[] regionLoads;
065  private RegionHDFSBlockLocationFinder regionFinder;
066
067  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
068
069  int[] serverIndexToHostIndex; // serverIndex -> host index
070  int[] serverIndexToRackIndex; // serverIndex -> rack index
071
072  int[][] regionsPerServer; // serverIndex -> region list
073  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
074  int[][] regionsPerHost; // hostIndex -> list of regions
075  int[][] regionsPerRack; // rackIndex -> region list
076  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
077  // replicas by primary region index
078  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
079  // primary region index
080  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
081  // primary region index
082
083  int[][] serversPerHost; // hostIndex -> list of server indexes
084  int[][] serversPerRack; // rackIndex -> list of server indexes
085  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
086  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
087  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
088  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
089  int[] numRegionsPerTable; // tableIndex -> region count
090  double[] meanRegionsPerTable; // mean region count per table
091  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
092  boolean hasRegionReplicas = false; // whether there is regions with replicas
093
094  Integer[] serverIndicesSortedByRegionCount;
095  Integer[] serverIndicesSortedByLocality;
096
097  Map<Address, Integer> serversToIndex;
098  Map<String, Integer> hostsToIndex;
099  Map<String, Integer> racksToIndex;
100  Map<String, Integer> tablesToIndex;
101  Map<RegionInfo, Integer> regionsToIndex;
102  float[] localityPerServer;
103
104  int numServers;
105  int numHosts;
106  int numRacks;
107  int numTables;
108  int numRegions;
109
110  int numMovedRegions = 0; // num moved regions from the initial configuration
111  Map<ServerName, List<RegionInfo>> clusterState;
112
113  private final RackManager rackManager;
114  // Maps region -> rackIndex -> locality of region on rack
115  private float[][] rackLocalities;
116  // Maps localityType -> region -> [server|rack]Index with highest locality
117  private int[][] regionsToMostLocalEntities;
118  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
119  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
120  // Maps regionIndex -> serverIndex with best region cache ratio
121  private int[] regionServerIndexWithBestRegionCachedRatio;
122  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
123  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
124
125  static class DefaultRackManager extends RackManager {
126    @Override
127    public String getRack(ServerName server) {
128      return UNKNOWN_RACK;
129    }
130  }
131
132  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
133    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
134    RackManager rackManager) {
135    this(null, clusterState, loads, regionFinder, rackManager, null);
136  }
137
138  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
139    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
140    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
141    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
142  }
143
144  @SuppressWarnings("unchecked")
145  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
146    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
147    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
148    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
149    if (unassignedRegions == null) {
150      unassignedRegions = Collections.emptyList();
151    }
152
153    serversToIndex = new HashMap<>();
154    hostsToIndex = new HashMap<>();
155    racksToIndex = new HashMap<>();
156    tablesToIndex = new HashMap<>();
157
158    // TODO: We should get the list of tables from master
159    tables = new ArrayList<>();
160    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
161
162    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
163
164    numRegions = 0;
165
166    List<List<Integer>> serversPerHostList = new ArrayList<>();
167    List<List<Integer>> serversPerRackList = new ArrayList<>();
168    this.clusterState = clusterState;
169    this.regionFinder = regionFinder;
170
171    // Use servername and port as there can be dead servers in this list. We want everything with
172    // a matching hostname and port to have the same index.
173    for (ServerName sn : clusterState.keySet()) {
174      if (sn == null) {
175        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
176          + "skipping; unassigned regions?");
177        if (LOG.isTraceEnabled()) {
178          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
179        }
180        continue;
181      }
182      if (serversToIndex.get(sn.getAddress()) == null) {
183        serversToIndex.put(sn.getAddress(), numServers++);
184      }
185      if (!hostsToIndex.containsKey(sn.getHostname())) {
186        hostsToIndex.put(sn.getHostname(), numHosts++);
187        serversPerHostList.add(new ArrayList<>(1));
188      }
189
190      int serverIndex = serversToIndex.get(sn.getAddress());
191      int hostIndex = hostsToIndex.get(sn.getHostname());
192      serversPerHostList.get(hostIndex).add(serverIndex);
193
194      String rack = this.rackManager.getRack(sn);
195
196      if (!racksToIndex.containsKey(rack)) {
197        racksToIndex.put(rack, numRacks++);
198        serversPerRackList.add(new ArrayList<>());
199      }
200      int rackIndex = racksToIndex.get(rack);
201      serversPerRackList.get(rackIndex).add(serverIndex);
202    }
203
204    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
205    // Count how many regions there are.
206    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
207      numRegions += entry.getValue().size();
208    }
209    numRegions += unassignedRegions.size();
210
211    regionsToIndex = new HashMap<>(numRegions);
212    servers = new ServerName[numServers];
213    serversPerHost = new int[numHosts][];
214    serversPerRack = new int[numRacks][];
215    regions = new RegionInfo[numRegions];
216    regionIndexToServerIndex = new int[numRegions];
217    initialRegionIndexToServerIndex = new int[numRegions];
218    regionIndexToTableIndex = new int[numRegions];
219    regionIndexToPrimaryIndex = new int[numRegions];
220    regionLoads = new Deque[numRegions];
221
222    regionLocations = new int[numRegions][];
223    serverIndicesSortedByRegionCount = new Integer[numServers];
224    serverIndicesSortedByLocality = new Integer[numServers];
225    localityPerServer = new float[numServers];
226
227    serverIndexToHostIndex = new int[numServers];
228    serverIndexToRackIndex = new int[numServers];
229    regionsPerServer = new int[numServers][];
230    serverIndexToRegionsOffset = new int[numServers];
231    regionsPerHost = new int[numHosts][];
232    regionsPerRack = new int[numRacks][];
233    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
234    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
235    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
236
237    int regionIndex = 0, regionPerServerIndex = 0;
238
239    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
240      if (entry.getKey() == null) {
241        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
242        continue;
243      }
244      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
245
246      // keep the servername if this is the first server name for this hostname
247      // or this servername has the newest startcode.
248      if (
249        servers[serverIndex] == null
250          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
251      ) {
252        servers[serverIndex] = entry.getKey();
253      }
254
255      if (regionsPerServer[serverIndex] != null) {
256        // there is another server with the same hostAndPort in ClusterState.
257        // allocate the array for the total size
258        regionsPerServer[serverIndex] =
259          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
260      } else {
261        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
262      }
263      colocatedReplicaCountsPerServer[serverIndex] =
264        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
265      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
266      serverIndicesSortedByLocality[serverIndex] = serverIndex;
267    }
268
269    hosts = new String[numHosts];
270    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
271      hosts[entry.getValue()] = entry.getKey();
272    }
273    racks = new String[numRacks];
274    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
275      racks[entry.getValue()] = entry.getKey();
276    }
277
278    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
279      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
280      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
281
282      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
283      serverIndexToHostIndex[serverIndex] = hostIndex;
284
285      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
286      serverIndexToRackIndex[serverIndex] = rackIndex;
287
288      for (RegionInfo region : entry.getValue()) {
289        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
290        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
291        regionIndex++;
292      }
293      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
294    }
295
296    for (RegionInfo region : unassignedRegions) {
297      registerRegion(region, regionIndex, -1, loads, regionFinder);
298      regionIndex++;
299    }
300
301    if (LOG.isDebugEnabled()) {
302      for (int i = 0; i < numServers; i++) {
303        LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
304      }
305    }
306    for (int i = 0; i < serversPerHostList.size(); i++) {
307      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
308      for (int j = 0; j < serversPerHost[i].length; j++) {
309        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
310        LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
311      }
312      if (serversPerHost[i].length > 1) {
313        multiServersPerHost = true;
314      }
315    }
316
317    for (int i = 0; i < serversPerRackList.size(); i++) {
318      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
319      for (int j = 0; j < serversPerRack[i].length; j++) {
320        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
321        LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
322      }
323    }
324
325    numTables = tables.size();
326    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
327      numRacks);
328    numRegionsPerServerPerTable = new int[numTables][numServers];
329    numRegionsPerTable = new int[numTables];
330
331    for (int i = 0; i < numTables; i++) {
332      for (int j = 0; j < numServers; j++) {
333        numRegionsPerServerPerTable[i][j] = 0;
334      }
335    }
336
337    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
338      if (regionIndexToServerIndex[i] >= 0) {
339        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
340        numRegionsPerTable[regionIndexToTableIndex[i]]++;
341      }
342    }
343
344    // Avoid repeated computation for planning
345    meanRegionsPerTable = new double[numTables];
346
347    for (int i = 0; i < numTables; i++) {
348      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
349    }
350
351    for (int i = 0; i < regions.length; i++) {
352      RegionInfo info = regions[i];
353      if (RegionReplicaUtil.isDefaultReplica(info)) {
354        regionIndexToPrimaryIndex[i] = i;
355      } else {
356        hasRegionReplicas = true;
357        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
358        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
359      }
360    }
361
362    for (int i = 0; i < regionsPerServer.length; i++) {
363      colocatedReplicaCountsPerServer[i] =
364        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
365      for (int j = 0; j < regionsPerServer[i].length; j++) {
366        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
367        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
368      }
369    }
370    // compute regionsPerHost
371    if (multiServersPerHost) {
372      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
373        serversPerHost);
374    }
375
376    // compute regionsPerRack
377    if (numRacks > 1) {
378      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
379        serversPerRack);
380    }
381  }
382
383  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
384    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
385    for (int i = 0; i < serversPerLocation.length; i++) {
386      int numRegionsPerLocation = 0;
387      for (int j = 0; j < serversPerLocation[i].length; j++) {
388        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
389      }
390      regionsPerLocation[i] = new int[numRegionsPerLocation];
391      colocatedReplicaCountsPerLocation[i] =
392        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
393    }
394
395    for (int i = 0; i < serversPerLocation.length; i++) {
396      int numRegionPerLocationIndex = 0;
397      for (int j = 0; j < serversPerLocation[i].length; j++) {
398        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
399          int region = regionsPerServer[serversPerLocation[i][j]][k];
400          regionsPerLocation[i][numRegionPerLocationIndex] = region;
401          int primaryIndex = regionIndexToPrimaryIndex[region];
402          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
403          numRegionPerLocationIndex++;
404        }
405      }
406    }
407
408  }
409
410  /** Helper for Cluster constructor to handle a region */
411  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
412    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
413    String tableName = region.getTable().getNameAsString();
414    if (!tablesToIndex.containsKey(tableName)) {
415      tables.add(tableName);
416      tablesToIndex.put(tableName, tablesToIndex.size());
417    }
418    int tableIndex = tablesToIndex.get(tableName);
419
420    regionsToIndex.put(region, regionIndex);
421    regions[regionIndex] = region;
422    regionIndexToServerIndex[regionIndex] = serverIndex;
423    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
424    regionIndexToTableIndex[regionIndex] = tableIndex;
425
426    // region load
427    if (loads != null) {
428      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
429      // That could have failed if the RegionLoad is using the other regionName
430      if (rl == null) {
431        // Try getting the region load using encoded name.
432        rl = loads.get(region.getEncodedName());
433      }
434      regionLoads[regionIndex] = rl;
435    }
436
437    if (regionFinder != null) {
438      // region location
439      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
440      regionLocations[regionIndex] = new int[loc.size()];
441      for (int i = 0; i < loc.size(); i++) {
442        regionLocations[regionIndex][i] = loc.get(i) == null
443          ? -1
444          : (serversToIndex.get(loc.get(i).getAddress()) == null
445            ? -1
446            : serversToIndex.get(loc.get(i).getAddress()));
447      }
448    }
449  }
450
451  /**
452   * Returns true iff a given server has less regions than the balanced amount
453   */
454  public boolean serverHasTooFewRegions(int server) {
455    int minLoad = this.numRegions / numServers;
456    int numRegions = getNumRegions(server);
457    return numRegions < minLoad;
458  }
459
460  /**
461   * Retrieves and lazily initializes a field storing the locality of every region/server
462   * combination
463   */
464  public float[][] getOrComputeRackLocalities() {
465    if (rackLocalities == null || regionsToMostLocalEntities == null) {
466      computeCachedLocalities();
467    }
468    return rackLocalities;
469  }
470
471  /**
472   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
473   * the locality
474   */
475  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
476    if (rackLocalities == null || regionsToMostLocalEntities == null) {
477      computeCachedLocalities();
478    }
479    return regionsToMostLocalEntities[type.ordinal()];
480  }
481
482  /**
483   * Looks up locality from cache of localities. Will create cache if it does not already exist.
484   */
485  public float getOrComputeLocality(int region, int entity,
486    BalancerClusterState.LocalityType type) {
487    switch (type) {
488      case SERVER:
489        return getLocalityOfRegion(region, entity);
490      case RACK:
491        return getOrComputeRackLocalities()[region][entity];
492      default:
493        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
494    }
495  }
496
497  /**
498   * Returns locality weighted by region size in MB. Will create locality cache if it does not
499   * already exist.
500   */
501  public double getOrComputeWeightedLocality(int region, int server,
502    BalancerClusterState.LocalityType type) {
503    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
504  }
505
506  /**
507   * Returns the size in MB from the most recent RegionLoad for region
508   */
509  public int getRegionSizeMB(int region) {
510    Deque<BalancerRegionLoad> load = regionLoads[region];
511    // This means regions have no actual data on disk
512    if (load == null) {
513      return 0;
514    }
515    return regionLoads[region].getLast().getStorefileSizeMB();
516  }
517
518  /**
519   * Computes and caches the locality for each region/rack combinations, as well as storing a
520   * mapping of region -> server and region -> rack such that server and rack have the highest
521   * locality for region
522   */
523  private void computeCachedLocalities() {
524    rackLocalities = new float[numRegions][numRacks];
525    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
526
527    // Compute localities and find most local server per region
528    for (int region = 0; region < numRegions; region++) {
529      int serverWithBestLocality = 0;
530      float bestLocalityForRegion = 0;
531      for (int server = 0; server < numServers; server++) {
532        // Aggregate per-rack locality
533        float locality = getLocalityOfRegion(region, server);
534        int rack = serverIndexToRackIndex[server];
535        int numServersInRack = serversPerRack[rack].length;
536        rackLocalities[region][rack] += locality / numServersInRack;
537
538        if (locality > bestLocalityForRegion) {
539          serverWithBestLocality = server;
540          bestLocalityForRegion = locality;
541        }
542      }
543      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
544
545      // Find most local rack per region
546      int rackWithBestLocality = 0;
547      float bestRackLocalityForRegion = 0.0f;
548      for (int rack = 0; rack < numRacks; rack++) {
549        float rackLocality = rackLocalities[region][rack];
550        if (rackLocality > bestRackLocalityForRegion) {
551          bestRackLocalityForRegion = rackLocality;
552          rackWithBestLocality = rack;
553        }
554      }
555      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
556    }
557
558  }
559
560  /**
561   * Returns the size of hFiles from the most recent RegionLoad for region
562   */
563  public int getTotalRegionHFileSizeMB(int region) {
564    Deque<BalancerRegionLoad> load = regionLoads[region];
565    if (load == null) {
566      // This means, that the region has no actual data on disk
567      return 0;
568    }
569    return regionLoads[region].getLast().getRegionSizeMB();
570  }
571
572  /**
573   * Returns the weighted cache ratio of a region on the given region server
574   */
575  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
576    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
577  }
578
579  /**
580   * Returns the amount by which a region is cached on a given region server. If the region is not
581   * currently hosted on the given region server, then find out if it was previously hosted there
582   * and return the old cache ratio.
583   */
584  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
585    float regionCacheRatio = 0.0f;
586
587    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
588    for (int regionIndex : regionsPerServer[regionServerIndex]) {
589      if (region != regionIndex) {
590        continue;
591      }
592
593      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
594
595      // The region is currently hosted on this region server. Get the region cache ratio for this
596      // region on this server
597      regionCacheRatio =
598        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
599
600      return regionCacheRatio;
601    }
602
603    // Region is not currently hosted on this server. Check if the region was cached on this
604    // server earlier. This can happen when the server was shutdown and the cache was persisted.
605    // Search using the region name and server name and not the index id and server id as these ids
606    // may change when a server is marked as dead or a new server is added.
607    String regionEncodedName = regions[region].getEncodedName();
608    ServerName serverName = servers[regionServerIndex];
609    if (
610      regionCacheRatioOnOldServerMap != null
611        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
612    ) {
613      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
614        regionCacheRatioOnOldServerMap.get(regionEncodedName);
615      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
616        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
617        if (LOG.isDebugEnabled()) {
618          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
619            serverName, regionCacheRatio);
620        }
621      }
622    }
623    return regionCacheRatio;
624  }
625
626  /**
627   * Populate the maps containing information about how much a region is cached on a region server.
628   */
629  private void computeRegionServerRegionCacheRatio() {
630    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
631    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
632
633    for (int region = 0; region < numRegions; region++) {
634      float bestRegionCacheRatio = 0.0f;
635      int serverWithBestRegionCacheRatio = 0;
636      for (int server = 0; server < numServers; server++) {
637        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
638        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
639          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
640          // cache ratio only if the cache ratio is greater than 0.
641          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
642          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
643        }
644        if (regionCacheRatio > bestRegionCacheRatio) {
645          serverWithBestRegionCacheRatio = server;
646          // If the server currently hosting the region has equal cache ratio to a historical
647          // server, consider the current server to keep hosting the region
648          bestRegionCacheRatio = regionCacheRatio;
649        } else if (
650          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
651        ) {
652          // If two servers have same region cache ratio, then the server currently hosting the
653          // region
654          // should retain the region
655          serverWithBestRegionCacheRatio = server;
656        }
657      }
658      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
659      Pair<Integer, Integer> regionServerPair =
660        new Pair<>(region, regionIndexToServerIndex[region]);
661      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
662      if (tempRegionCacheRatio > bestRegionCacheRatio) {
663        LOG.warn(
664          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
665            + "best region cache ratio {} on server {}",
666          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
667          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
668      }
669    }
670  }
671
672  protected float getOrComputeRegionCacheRatio(int region, int server) {
673    if (
674      regionServerIndexWithBestRegionCachedRatio == null
675        || regionIndexServerIndexRegionCachedRatio.isEmpty()
676    ) {
677      computeRegionServerRegionCacheRatio();
678    }
679
680    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
681    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
682      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
683      : 0.0f;
684  }
685
686  public int[] getOrComputeServerWithBestRegionCachedRatio() {
687    if (
688      regionServerIndexWithBestRegionCachedRatio == null
689        || regionIndexServerIndexRegionCachedRatio.isEmpty()
690    ) {
691      computeRegionServerRegionCacheRatio();
692    }
693    return regionServerIndexWithBestRegionCachedRatio;
694  }
695
696  /**
697   * Maps region index to rack index
698   */
699  public int getRackForRegion(int region) {
700    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
701  }
702
703  enum LocalityType {
704    SERVER,
705    RACK
706  }
707
708  public void doAction(BalanceAction action) {
709    switch (action.getType()) {
710      case NULL:
711        break;
712      case ASSIGN_REGION:
713        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
714        assert action instanceof AssignRegionAction : action.getClass();
715        AssignRegionAction ar = (AssignRegionAction) action;
716        regionsPerServer[ar.getServer()] =
717          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
718        regionMoved(ar.getRegion(), -1, ar.getServer());
719        break;
720      case MOVE_REGION:
721        assert action instanceof MoveRegionAction : action.getClass();
722        MoveRegionAction mra = (MoveRegionAction) action;
723        regionsPerServer[mra.getFromServer()] =
724          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
725        regionsPerServer[mra.getToServer()] =
726          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
727        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
728        break;
729      case SWAP_REGIONS:
730        assert action instanceof SwapRegionsAction : action.getClass();
731        SwapRegionsAction a = (SwapRegionsAction) action;
732        regionsPerServer[a.getFromServer()] =
733          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
734        regionsPerServer[a.getToServer()] =
735          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
736        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
737        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
738        break;
739      default:
740        throw new RuntimeException("Uknown action:" + action.getType());
741    }
742  }
743
744  /**
745   * Return true if the placement of region on server would lower the availability of the region in
746   * question
747   * @return true or false
748   */
749  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
750    if (!serversToIndex.containsKey(serverName.getAddress())) {
751      return false; // safeguard against race between cluster.servers and servers from LB method
752      // args
753    }
754    int server = serversToIndex.get(serverName.getAddress());
755    int region = regionsToIndex.get(regionInfo);
756
757    // Region replicas for same region should better assign to different servers
758    for (int i : regionsPerServer[server]) {
759      RegionInfo otherRegionInfo = regions[i];
760      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
761        return true;
762      }
763    }
764
765    int primary = regionIndexToPrimaryIndex[region];
766    if (primary == -1) {
767      return false;
768    }
769    // there is a subset relation for server < host < rack
770    // check server first
771    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
772    if (result != 0) {
773      return result > 0;
774    }
775
776    // check host
777    if (multiServersPerHost) {
778      result = checkLocationForPrimary(serverIndexToHostIndex[server],
779        colocatedReplicaCountsPerHost, primary);
780      if (result != 0) {
781        return result > 0;
782      }
783    }
784
785    // check rack
786    if (numRacks > 1) {
787      result = checkLocationForPrimary(serverIndexToRackIndex[server],
788        colocatedReplicaCountsPerRack, primary);
789      if (result != 0) {
790        return result > 0;
791      }
792    }
793    return false;
794  }
795
796  /**
797   * Common method for better solution check.
798   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
799   *                                          colocatedReplicaCountsPerRack
800   * @return 1 for better, -1 for no better, 0 for unknown
801   */
802  private int checkLocationForPrimary(int location,
803    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
804    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
805      // check for whether there are other Locations that we can place this region
806      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
807        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
808          return 1; // meaning there is a better Location
809        }
810      }
811      return -1; // there is not a better Location to place this
812    }
813    return 0;
814  }
815
816  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
817    if (!serversToIndex.containsKey(serverName.getAddress())) {
818      return;
819    }
820    int server = serversToIndex.get(serverName.getAddress());
821    int region = regionsToIndex.get(regionInfo);
822    doAction(new AssignRegionAction(region, server));
823  }
824
825  void regionMoved(int region, int oldServer, int newServer) {
826    regionIndexToServerIndex[region] = newServer;
827    if (initialRegionIndexToServerIndex[region] == newServer) {
828      numMovedRegions--; // region moved back to original location
829    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
830      numMovedRegions++; // region moved from original location
831    }
832    int tableIndex = regionIndexToTableIndex[region];
833    if (oldServer >= 0) {
834      numRegionsPerServerPerTable[tableIndex][oldServer]--;
835    }
836    numRegionsPerServerPerTable[tableIndex][newServer]++;
837
838    // update for servers
839    int primary = regionIndexToPrimaryIndex[region];
840    if (oldServer >= 0) {
841      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
842    }
843    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
844
845    // update for hosts
846    if (multiServersPerHost) {
847      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
848        oldServer, newServer, primary, region);
849    }
850
851    // update for racks
852    if (numRacks > 1) {
853      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
854        oldServer, newServer, primary, region);
855    }
856  }
857
858  /**
859   * Common method for per host and per Location region index updates when a region is moved.
860   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
861   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
862   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
863   *                                          colocatedReplicaCountsPerRack
864   */
865  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
866    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
867    int primary, int region) {
868    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
869    int newLocation = serverIndexToLocation[newServer];
870    if (newLocation != oldLocation) {
871      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
872      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
873      if (oldLocation >= 0) {
874        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
875        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
876      }
877    }
878
879  }
880
881  int[] removeRegion(int[] regions, int regionIndex) {
882    // TODO: this maybe costly. Consider using linked lists
883    int[] newRegions = new int[regions.length - 1];
884    int i = 0;
885    for (i = 0; i < regions.length; i++) {
886      if (regions[i] == regionIndex) {
887        break;
888      }
889      newRegions[i] = regions[i];
890    }
891    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
892    return newRegions;
893  }
894
895  int[] addRegion(int[] regions, int regionIndex) {
896    int[] newRegions = new int[regions.length + 1];
897    System.arraycopy(regions, 0, newRegions, 0, regions.length);
898    newRegions[newRegions.length - 1] = regionIndex;
899    return newRegions;
900  }
901
902  int[] addRegionSorted(int[] regions, int regionIndex) {
903    int[] newRegions = new int[regions.length + 1];
904    int i = 0;
905    for (i = 0; i < regions.length; i++) { // find the index to insert
906      if (regions[i] > regionIndex) {
907        break;
908      }
909    }
910    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
911    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
912    newRegions[i] = regionIndex;
913
914    return newRegions;
915  }
916
917  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
918    int i = 0;
919    for (i = 0; i < regions.length; i++) {
920      if (regions[i] == regionIndex) {
921        regions[i] = newRegionIndex;
922        break;
923      }
924    }
925    return regions;
926  }
927
928  void sortServersByRegionCount() {
929    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
930  }
931
932  int getNumRegions(int server) {
933    return regionsPerServer[server].length;
934  }
935
936  boolean contains(int[] arr, int val) {
937    return Arrays.binarySearch(arr, val) >= 0;
938  }
939
940  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
941
942  public Comparator<Integer> getNumRegionsComparator() {
943    return numRegionsComparator;
944  }
945
946  int getLowestLocalityRegionOnServer(int serverIndex) {
947    if (regionFinder != null) {
948      float lowestLocality = 1.0f;
949      int lowestLocalityRegionIndex = -1;
950      if (regionsPerServer[serverIndex].length == 0) {
951        // No regions on that region server
952        return -1;
953      }
954      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
955        int regionIndex = regionsPerServer[serverIndex][j];
956        HDFSBlocksDistribution distribution =
957          regionFinder.getBlockDistribution(regions[regionIndex]);
958        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
959        // skip empty region
960        if (distribution.getUniqueBlocksTotalWeight() == 0) {
961          continue;
962        }
963        if (locality < lowestLocality) {
964          lowestLocality = locality;
965          lowestLocalityRegionIndex = j;
966        }
967      }
968      if (lowestLocalityRegionIndex == -1) {
969        return -1;
970      }
971      if (LOG.isTraceEnabled()) {
972        LOG.trace("Lowest locality region is "
973          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
974            .getRegionNameAsString()
975          + " with locality " + lowestLocality + " and its region server contains "
976          + regionsPerServer[serverIndex].length + " regions");
977      }
978      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
979    } else {
980      return -1;
981    }
982  }
983
984  float getLocalityOfRegion(int region, int server) {
985    if (regionFinder != null) {
986      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
987      return distribution.getBlockLocalityIndex(servers[server].getHostname());
988    } else {
989      return 0f;
990    }
991  }
992
993  void setNumRegions(int numRegions) {
994    this.numRegions = numRegions;
995  }
996
997  void setNumMovedRegions(int numMovedRegions) {
998    this.numMovedRegions = numMovedRegions;
999  }
1000
1001  @Override
1002  public String toString() {
1003    StringBuilder desc = new StringBuilder("Cluster={servers=[");
1004    for (ServerName sn : servers) {
1005      desc.append(sn.getAddress().toString()).append(", ");
1006    }
1007    desc.append("], serverIndicesSortedByRegionCount=")
1008      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
1009      .append(Arrays.deepToString(regionsPerServer));
1010
1011    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1012      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
1013      .append('}');
1014    return desc.toString();
1015  }
1016}