001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Supplier;
032import org.agrona.collections.Hashing;
033import org.agrona.collections.Int2IntCounterMap;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionReplicaUtil;
038import org.apache.hadoop.hbase.master.RackManager;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
047
048/**
049 * An efficient array based implementation similar to ClusterState for keeping the status of the
050 * cluster in terms of region assignment and distribution. LoadBalancers, such as
051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
052 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
053 * <p/>
054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
055 * topology in terms of server names, hostnames and racks.
056 */
057@InterfaceAudience.Private
058class BalancerClusterState {
059
060  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
061
062  ServerName[] servers;
063  // ServerName uniquely identifies a region server. multiple RS can run on the same host
064  String[] hosts;
065  String[] racks;
066  boolean multiServersPerHost = false; // whether or not any host has more than one server
067
068  ArrayList<String> tables;
069  RegionInfo[] regions;
070  Deque<BalancerRegionLoad>[] regionLoads;
071  private RegionHDFSBlockLocationFinder regionFinder;
072
073  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
074
075  int[] serverIndexToHostIndex; // serverIndex -> host index
076  int[] serverIndexToRackIndex; // serverIndex -> rack index
077
078  int[][] regionsPerServer; // serverIndex -> region list
079  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
080  int[][] regionsPerHost; // hostIndex -> list of regions
081  int[][] regionsPerRack; // rackIndex -> region list
082  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
083  // replicas by primary region index
084  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
085  // primary region index
086  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
087  // primary region index
088
089  int[][] serversPerHost; // hostIndex -> list of server indexes
090  int[][] serversPerRack; // rackIndex -> list of server indexes
091  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
092  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
093  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
094  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
095  int[] numRegionsPerTable; // tableIndex -> region count
096  double[] meanRegionsPerTable; // mean region count per table
097  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
098  boolean hasRegionReplicas = false; // whether there is regions with replicas
099
100  Integer[] serverIndicesSortedByRegionCount;
101  Integer[] serverIndicesSortedByLocality;
102
103  Map<Address, Integer> serversToIndex;
104  Map<String, Integer> hostsToIndex;
105  Map<String, Integer> racksToIndex;
106  Map<String, Integer> tablesToIndex;
107  Map<RegionInfo, Integer> regionsToIndex;
108  float[] localityPerServer;
109
110  int numServers;
111  int numHosts;
112  int numRacks;
113  int numTables;
114  int numRegions;
115  int maxReplicas = 1;
116
117  int numMovedRegions = 0; // num moved regions from the initial configuration
118  Map<ServerName, List<RegionInfo>> clusterState;
119
120  private final RackManager rackManager;
121  // Maps region -> rackIndex -> locality of region on rack
122  private float[][] rackLocalities;
123  // Maps localityType -> region -> [server|rack]Index with highest locality
124  private int[][] regionsToMostLocalEntities;
125  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
126  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
127  // Maps regionIndex -> serverIndex with best region cache ratio
128  private int[] regionServerIndexWithBestRegionCachedRatio;
129  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
130  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
131
132  private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
133    Suppliers.memoizeWithExpiration(() -> {
134      Collection<Integer> serverIndices = serversToIndex.values();
135      List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
136      Collections.shuffle(shuffledServerIndices);
137      return shuffledServerIndices;
138    }, 5, TimeUnit.SECONDS);
139  private long stopRequestedAt = Long.MAX_VALUE;
140
141  static class DefaultRackManager extends RackManager {
142    @Override
143    public String getRack(ServerName server) {
144      return UNKNOWN_RACK;
145    }
146  }
147
148  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
149    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
150    RackManager rackManager) {
151    this(null, clusterState, loads, regionFinder, rackManager, null);
152  }
153
154  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
155    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
156    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
157    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
158  }
159
160  @SuppressWarnings("unchecked")
161  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
162    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
163    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
164    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
165    if (unassignedRegions == null) {
166      unassignedRegions = Collections.emptyList();
167    }
168
169    serversToIndex = new HashMap<>();
170    hostsToIndex = new HashMap<>();
171    racksToIndex = new HashMap<>();
172    tablesToIndex = new HashMap<>();
173
174    // TODO: We should get the list of tables from master
175    tables = new ArrayList<>();
176    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
177
178    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
179
180    numRegions = 0;
181
182    List<List<Integer>> serversPerHostList = new ArrayList<>();
183    List<List<Integer>> serversPerRackList = new ArrayList<>();
184    this.clusterState = clusterState;
185    this.regionFinder = regionFinder;
186
187    // Use servername and port as there can be dead servers in this list. We want everything with
188    // a matching hostname and port to have the same index.
189    for (ServerName sn : clusterState.keySet()) {
190      if (sn == null) {
191        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
192          + "skipping; unassigned regions?");
193        if (LOG.isTraceEnabled()) {
194          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
195        }
196        continue;
197      }
198      if (serversToIndex.get(sn.getAddress()) == null) {
199        serversToIndex.put(sn.getAddress(), numServers++);
200      }
201      if (!hostsToIndex.containsKey(sn.getHostname())) {
202        hostsToIndex.put(sn.getHostname(), numHosts++);
203        serversPerHostList.add(new ArrayList<>(1));
204      }
205
206      int serverIndex = serversToIndex.get(sn.getAddress());
207      int hostIndex = hostsToIndex.get(sn.getHostname());
208      serversPerHostList.get(hostIndex).add(serverIndex);
209
210      String rack = this.rackManager.getRack(sn);
211
212      if (!racksToIndex.containsKey(rack)) {
213        racksToIndex.put(rack, numRacks++);
214        serversPerRackList.add(new ArrayList<>());
215      }
216      int rackIndex = racksToIndex.get(rack);
217      serversPerRackList.get(rackIndex).add(serverIndex);
218    }
219
220    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
221    // Count how many regions there are.
222    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
223      numRegions += entry.getValue().size();
224    }
225    numRegions += unassignedRegions.size();
226
227    regionsToIndex = new HashMap<>(numRegions);
228    servers = new ServerName[numServers];
229    serversPerHost = new int[numHosts][];
230    serversPerRack = new int[numRacks][];
231    regions = new RegionInfo[numRegions];
232    regionIndexToServerIndex = new int[numRegions];
233    initialRegionIndexToServerIndex = new int[numRegions];
234    regionIndexToTableIndex = new int[numRegions];
235    regionIndexToPrimaryIndex = new int[numRegions];
236    regionLoads = new Deque[numRegions];
237
238    regionLocations = new int[numRegions][];
239    serverIndicesSortedByRegionCount = new Integer[numServers];
240    serverIndicesSortedByLocality = new Integer[numServers];
241    localityPerServer = new float[numServers];
242
243    serverIndexToHostIndex = new int[numServers];
244    serverIndexToRackIndex = new int[numServers];
245    regionsPerServer = new int[numServers][];
246    serverIndexToRegionsOffset = new int[numServers];
247    regionsPerHost = new int[numHosts][];
248    regionsPerRack = new int[numRacks][];
249    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
250    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
251    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
252
253    int regionIndex = 0, regionPerServerIndex = 0;
254
255    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
256      if (entry.getKey() == null) {
257        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
258        continue;
259      }
260      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
261
262      // keep the servername if this is the first server name for this hostname
263      // or this servername has the newest startcode.
264      if (
265        servers[serverIndex] == null
266          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
267      ) {
268        servers[serverIndex] = entry.getKey();
269      }
270
271      if (regionsPerServer[serverIndex] != null) {
272        // there is another server with the same hostAndPort in ClusterState.
273        // allocate the array for the total size
274        regionsPerServer[serverIndex] =
275          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
276      } else {
277        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
278      }
279      colocatedReplicaCountsPerServer[serverIndex] =
280        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
281      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
282      serverIndicesSortedByLocality[serverIndex] = serverIndex;
283    }
284
285    hosts = new String[numHosts];
286    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
287      hosts[entry.getValue()] = entry.getKey();
288    }
289    racks = new String[numRacks];
290    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
291      racks[entry.getValue()] = entry.getKey();
292    }
293
294    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
295      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
296      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
297
298      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
299      serverIndexToHostIndex[serverIndex] = hostIndex;
300
301      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
302      serverIndexToRackIndex[serverIndex] = rackIndex;
303
304      for (RegionInfo region : entry.getValue()) {
305        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
306        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
307        regionIndex++;
308      }
309      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
310    }
311
312    for (RegionInfo region : unassignedRegions) {
313      registerRegion(region, regionIndex, -1, loads, regionFinder);
314      regionIndex++;
315    }
316
317    if (LOG.isTraceEnabled()) {
318      for (int i = 0; i < numServers; i++) {
319        LOG.trace("server {} has {} regions", i, regionsPerServer[i].length);
320      }
321    }
322    for (int i = 0; i < serversPerHostList.size(); i++) {
323      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
324      for (int j = 0; j < serversPerHost[i].length; j++) {
325        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
326        LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i);
327      }
328      if (serversPerHost[i].length > 1) {
329        multiServersPerHost = true;
330      }
331    }
332
333    for (int i = 0; i < serversPerRackList.size(); i++) {
334      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
335      for (int j = 0; j < serversPerRack[i].length; j++) {
336        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
337        LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
338      }
339    }
340
341    numTables = tables.size();
342    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
343      numRacks);
344    numRegionsPerServerPerTable = new int[numTables][numServers];
345    numRegionsPerTable = new int[numTables];
346
347    for (int i = 0; i < numTables; i++) {
348      for (int j = 0; j < numServers; j++) {
349        numRegionsPerServerPerTable[i][j] = 0;
350      }
351    }
352
353    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
354      if (regionIndexToServerIndex[i] >= 0) {
355        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
356        numRegionsPerTable[regionIndexToTableIndex[i]]++;
357      }
358    }
359
360    // Avoid repeated computation for planning
361    meanRegionsPerTable = new double[numTables];
362
363    for (int i = 0; i < numTables; i++) {
364      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
365    }
366
367    for (int i = 0; i < regions.length; i++) {
368      RegionInfo info = regions[i];
369      if (RegionReplicaUtil.isDefaultReplica(info)) {
370        regionIndexToPrimaryIndex[i] = i;
371      } else {
372        hasRegionReplicas = true;
373        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
374        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
375      }
376    }
377
378    for (int i = 0; i < regionsPerServer.length; i++) {
379      colocatedReplicaCountsPerServer[i] =
380        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
381      for (int j = 0; j < regionsPerServer[i].length; j++) {
382        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
383        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
384      }
385    }
386    // compute regionsPerHost
387    if (multiServersPerHost) {
388      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
389        serversPerHost);
390    }
391
392    // compute regionsPerRack
393    if (numRacks > 1) {
394      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
395        serversPerRack);
396    }
397  }
398
399  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
400    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
401    for (int i = 0; i < serversPerLocation.length; i++) {
402      int numRegionsPerLocation = 0;
403      for (int j = 0; j < serversPerLocation[i].length; j++) {
404        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
405      }
406      regionsPerLocation[i] = new int[numRegionsPerLocation];
407      colocatedReplicaCountsPerLocation[i] =
408        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
409    }
410
411    for (int i = 0; i < serversPerLocation.length; i++) {
412      int numRegionPerLocationIndex = 0;
413      for (int j = 0; j < serversPerLocation[i].length; j++) {
414        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
415          int region = regionsPerServer[serversPerLocation[i][j]][k];
416          regionsPerLocation[i][numRegionPerLocationIndex] = region;
417          int primaryIndex = regionIndexToPrimaryIndex[region];
418          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
419          numRegionPerLocationIndex++;
420        }
421      }
422    }
423
424  }
425
426  /** Helper for Cluster constructor to handle a region */
427  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
428    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
429    String tableName = region.getTable().getNameAsString();
430    if (!tablesToIndex.containsKey(tableName)) {
431      tables.add(tableName);
432      tablesToIndex.put(tableName, tablesToIndex.size());
433    }
434    int tableIndex = tablesToIndex.get(tableName);
435
436    regionsToIndex.put(region, regionIndex);
437    regions[regionIndex] = region;
438    regionIndexToServerIndex[regionIndex] = serverIndex;
439    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
440    regionIndexToTableIndex[regionIndex] = tableIndex;
441
442    // region load
443    if (loads != null) {
444      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
445      // That could have failed if the RegionLoad is using the other regionName
446      if (rl == null) {
447        // Try getting the region load using encoded name.
448        rl = loads.get(region.getEncodedName());
449      }
450      regionLoads[regionIndex] = rl;
451    }
452
453    if (regionFinder != null) {
454      // region location
455      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
456      regionLocations[regionIndex] = new int[loc.size()];
457      for (int i = 0; i < loc.size(); i++) {
458        regionLocations[regionIndex][i] = loc.get(i) == null
459          ? -1
460          : (serversToIndex.get(loc.get(i).getAddress()) == null
461            ? -1
462            : serversToIndex.get(loc.get(i).getAddress()));
463      }
464    }
465
466    int numReplicas = region.getReplicaId() + 1;
467    if (numReplicas > maxReplicas) {
468      maxReplicas = numReplicas;
469    }
470  }
471
472  /**
473   * Returns true iff a given server has less regions than the balanced amount
474   */
475  public boolean serverHasTooFewRegions(int server) {
476    int minLoad = this.numRegions / numServers;
477    int numRegions = getNumRegions(server);
478    return numRegions < minLoad;
479  }
480
481  /**
482   * Retrieves and lazily initializes a field storing the locality of every region/server
483   * combination
484   */
485  public float[][] getOrComputeRackLocalities() {
486    if (rackLocalities == null || regionsToMostLocalEntities == null) {
487      computeCachedLocalities();
488    }
489    return rackLocalities;
490  }
491
492  /**
493   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
494   * the locality
495   */
496  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
497    if (rackLocalities == null || regionsToMostLocalEntities == null) {
498      computeCachedLocalities();
499    }
500    return regionsToMostLocalEntities[type.ordinal()];
501  }
502
503  /**
504   * Looks up locality from cache of localities. Will create cache if it does not already exist.
505   */
506  public float getOrComputeLocality(int region, int entity,
507    BalancerClusterState.LocalityType type) {
508    switch (type) {
509      case SERVER:
510        return getLocalityOfRegion(region, entity);
511      case RACK:
512        return getOrComputeRackLocalities()[region][entity];
513      default:
514        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
515    }
516  }
517
518  /**
519   * Returns locality weighted by region size in MB. Will create locality cache if it does not
520   * already exist.
521   */
522  public double getOrComputeWeightedLocality(int region, int server,
523    BalancerClusterState.LocalityType type) {
524    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
525  }
526
527  /**
528   * Returns the size in MB from the most recent RegionLoad for region
529   */
530  public int getRegionSizeMB(int region) {
531    Deque<BalancerRegionLoad> load = regionLoads[region];
532    // This means regions have no actual data on disk
533    if (load == null) {
534      return 0;
535    }
536    return regionLoads[region].getLast().getStorefileSizeMB();
537  }
538
539  /**
540   * Computes and caches the locality for each region/rack combinations, as well as storing a
541   * mapping of region -> server and region -> rack such that server and rack have the highest
542   * locality for region
543   */
544  private void computeCachedLocalities() {
545    rackLocalities = new float[numRegions][numRacks];
546    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
547
548    // Compute localities and find most local server per region
549    for (int region = 0; region < numRegions; region++) {
550      int serverWithBestLocality = 0;
551      float bestLocalityForRegion = 0;
552      for (int server = 0; server < numServers; server++) {
553        // Aggregate per-rack locality
554        float locality = getLocalityOfRegion(region, server);
555        int rack = serverIndexToRackIndex[server];
556        int numServersInRack = serversPerRack[rack].length;
557        rackLocalities[region][rack] += locality / numServersInRack;
558
559        if (locality > bestLocalityForRegion) {
560          serverWithBestLocality = server;
561          bestLocalityForRegion = locality;
562        }
563      }
564      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
565
566      // Find most local rack per region
567      int rackWithBestLocality = 0;
568      float bestRackLocalityForRegion = 0.0f;
569      for (int rack = 0; rack < numRacks; rack++) {
570        float rackLocality = rackLocalities[region][rack];
571        if (rackLocality > bestRackLocalityForRegion) {
572          bestRackLocalityForRegion = rackLocality;
573          rackWithBestLocality = rack;
574        }
575      }
576      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
577    }
578
579  }
580
581  /**
582   * Returns the size of hFiles from the most recent RegionLoad for region
583   */
584  public int getTotalRegionHFileSizeMB(int region) {
585    Deque<BalancerRegionLoad> load = regionLoads[region];
586    if (load == null) {
587      // This means, that the region has no actual data on disk
588      return 0;
589    }
590    return regionLoads[region].getLast().getRegionSizeMB();
591  }
592
593  /**
594   * Returns the weighted cache ratio of a region on the given region server
595   */
596  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
597    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
598  }
599
600  /**
601   * Returns the amount by which a region is cached on a given region server. If the region is not
602   * currently hosted on the given region server, then find out if it was previously hosted there
603   * and return the old cache ratio.
604   */
605  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
606    float regionCacheRatio = 0.0f;
607
608    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
609    for (int regionIndex : regionsPerServer[regionServerIndex]) {
610      if (region != regionIndex) {
611        continue;
612      }
613
614      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
615
616      // The region is currently hosted on this region server. Get the region cache ratio for this
617      // region on this server
618      regionCacheRatio =
619        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
620
621      return regionCacheRatio;
622    }
623
624    // Region is not currently hosted on this server. Check if the region was cached on this
625    // server earlier. This can happen when the server was shutdown and the cache was persisted.
626    // Search using the region name and server name and not the index id and server id as these ids
627    // may change when a server is marked as dead or a new server is added.
628    String regionEncodedName = regions[region].getEncodedName();
629    ServerName serverName = servers[regionServerIndex];
630    if (
631      regionCacheRatioOnOldServerMap != null
632        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
633    ) {
634      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
635        regionCacheRatioOnOldServerMap.get(regionEncodedName);
636      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
637        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
638        if (LOG.isDebugEnabled()) {
639          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
640            serverName, regionCacheRatio);
641        }
642      }
643    }
644    return regionCacheRatio;
645  }
646
647  /**
648   * Populate the maps containing information about how much a region is cached on a region server.
649   */
650  private void computeRegionServerRegionCacheRatio() {
651    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
652    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
653
654    for (int region = 0; region < numRegions; region++) {
655      float bestRegionCacheRatio = 0.0f;
656      int serverWithBestRegionCacheRatio = 0;
657      for (int server = 0; server < numServers; server++) {
658        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
659        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
660          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
661          // cache ratio only if the cache ratio is greater than 0.
662          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
663          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
664        }
665        if (regionCacheRatio > bestRegionCacheRatio) {
666          serverWithBestRegionCacheRatio = server;
667          // If the server currently hosting the region has equal cache ratio to a historical
668          // server, consider the current server to keep hosting the region
669          bestRegionCacheRatio = regionCacheRatio;
670        } else if (
671          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
672        ) {
673          // If two servers have same region cache ratio, then the server currently hosting the
674          // region
675          // should retain the region
676          serverWithBestRegionCacheRatio = server;
677        }
678      }
679      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
680      Pair<Integer, Integer> regionServerPair =
681        new Pair<>(region, regionIndexToServerIndex[region]);
682      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
683      if (tempRegionCacheRatio > bestRegionCacheRatio) {
684        LOG.warn(
685          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
686            + "best region cache ratio {} on server {}",
687          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
688          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
689      }
690    }
691  }
692
693  protected float getOrComputeRegionCacheRatio(int region, int server) {
694    if (
695      regionServerIndexWithBestRegionCachedRatio == null
696        || regionIndexServerIndexRegionCachedRatio.isEmpty()
697    ) {
698      computeRegionServerRegionCacheRatio();
699    }
700
701    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
702    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
703      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
704      : 0.0f;
705  }
706
707  public int[] getOrComputeServerWithBestRegionCachedRatio() {
708    if (
709      regionServerIndexWithBestRegionCachedRatio == null
710        || regionIndexServerIndexRegionCachedRatio.isEmpty()
711    ) {
712      computeRegionServerRegionCacheRatio();
713    }
714    return regionServerIndexWithBestRegionCachedRatio;
715  }
716
717  /**
718   * Maps region index to rack index
719   */
720  public int getRackForRegion(int region) {
721    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
722  }
723
724  enum LocalityType {
725    SERVER,
726    RACK
727  }
728
729  public void doAction(BalanceAction action) {
730    switch (action.getType()) {
731      case NULL:
732        break;
733      case ASSIGN_REGION:
734        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
735        assert action instanceof AssignRegionAction : action.getClass();
736        AssignRegionAction ar = (AssignRegionAction) action;
737        regionsPerServer[ar.getServer()] =
738          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
739        regionMoved(ar.getRegion(), -1, ar.getServer());
740        break;
741      case MOVE_REGION:
742        assert action instanceof MoveRegionAction : action.getClass();
743        MoveRegionAction mra = (MoveRegionAction) action;
744        regionsPerServer[mra.getFromServer()] =
745          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
746        regionsPerServer[mra.getToServer()] =
747          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
748        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
749        break;
750      case SWAP_REGIONS:
751        assert action instanceof SwapRegionsAction : action.getClass();
752        SwapRegionsAction a = (SwapRegionsAction) action;
753        regionsPerServer[a.getFromServer()] =
754          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
755        regionsPerServer[a.getToServer()] =
756          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
757        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
758        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
759        break;
760      case MOVE_BATCH:
761        assert action instanceof MoveBatchAction : action.getClass();
762        MoveBatchAction mba = (MoveBatchAction) action;
763        for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
764          Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
765          regionsPerServer[serverIndex] =
766            removeRegions(regionsPerServer[serverIndex], regionsToRemove);
767        }
768        for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
769          Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
770          regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
771        }
772        for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
773          regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(),
774            moveRegionAction.getToServer());
775        }
776        break;
777      default:
778        throw new RuntimeException("Unknown action:" + action.getType());
779    }
780  }
781
782  /**
783   * Return true if the placement of region on server would lower the availability of the region in
784   * question
785   * @return true or false
786   */
787  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
788    if (!serversToIndex.containsKey(serverName.getAddress())) {
789      return false; // safeguard against race between cluster.servers and servers from LB method
790      // args
791    }
792    int server = serversToIndex.get(serverName.getAddress());
793    int region = regionsToIndex.get(regionInfo);
794
795    // Region replicas for same region should better assign to different servers
796    for (int i : regionsPerServer[server]) {
797      RegionInfo otherRegionInfo = regions[i];
798      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
799        return true;
800      }
801    }
802
803    int primary = regionIndexToPrimaryIndex[region];
804    if (primary == -1) {
805      return false;
806    }
807    // there is a subset relation for server < host < rack
808    // check server first
809    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
810    if (result != 0) {
811      return result > 0;
812    }
813
814    // check host
815    if (multiServersPerHost) {
816      result = checkLocationForPrimary(serverIndexToHostIndex[server],
817        colocatedReplicaCountsPerHost, primary);
818      if (result != 0) {
819        return result > 0;
820      }
821    }
822
823    // check rack
824    if (numRacks > 1) {
825      result = checkLocationForPrimary(serverIndexToRackIndex[server],
826        colocatedReplicaCountsPerRack, primary);
827      if (result != 0) {
828        return result > 0;
829      }
830    }
831    return false;
832  }
833
834  /**
835   * Common method for better solution check.
836   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
837   *                                          colocatedReplicaCountsPerRack
838   * @return 1 for better, -1 for no better, 0 for unknown
839   */
840  private int checkLocationForPrimary(int location,
841    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
842    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
843      // check for whether there are other Locations that we can place this region
844      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
845        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
846          return 1; // meaning there is a better Location
847        }
848      }
849      return -1; // there is not a better Location to place this
850    }
851    return 0;
852  }
853
854  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
855    if (!serversToIndex.containsKey(serverName.getAddress())) {
856      return;
857    }
858    int server = serversToIndex.get(serverName.getAddress());
859    int region = regionsToIndex.get(regionInfo);
860    doAction(new AssignRegionAction(region, server));
861  }
862
863  void regionMoved(int region, int oldServer, int newServer) {
864    regionIndexToServerIndex[region] = newServer;
865    if (initialRegionIndexToServerIndex[region] == newServer) {
866      numMovedRegions--; // region moved back to original location
867    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
868      numMovedRegions++; // region moved from original location
869    }
870    int tableIndex = regionIndexToTableIndex[region];
871    if (oldServer >= 0) {
872      numRegionsPerServerPerTable[tableIndex][oldServer]--;
873    }
874    numRegionsPerServerPerTable[tableIndex][newServer]++;
875
876    // update for servers
877    int primary = regionIndexToPrimaryIndex[region];
878    if (oldServer >= 0) {
879      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
880    }
881    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
882
883    // update for hosts
884    if (multiServersPerHost) {
885      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
886        oldServer, newServer, primary, region);
887    }
888
889    // update for racks
890    if (numRacks > 1) {
891      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
892        oldServer, newServer, primary, region);
893    }
894  }
895
896  /**
897   * Common method for per host and per Location region index updates when a region is moved.
898   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
899   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
900   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
901   *                                          colocatedReplicaCountsPerRack
902   */
903  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
904    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
905    int primary, int region) {
906    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
907    int newLocation = serverIndexToLocation[newServer];
908    if (newLocation != oldLocation) {
909      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
910      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
911      if (oldLocation >= 0) {
912        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
913        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
914      }
915    }
916
917  }
918
919  int[] removeRegion(int[] regions, int regionIndex) {
920    // TODO: this maybe costly. Consider using linked lists
921    int[] newRegions = new int[regions.length - 1];
922    int i = 0;
923    for (i = 0; i < regions.length; i++) {
924      if (regions[i] == regionIndex) {
925        break;
926      }
927      newRegions[i] = regions[i];
928    }
929    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
930    return newRegions;
931  }
932
933  int[] addRegion(int[] regions, int regionIndex) {
934    int[] newRegions = new int[regions.length + 1];
935    System.arraycopy(regions, 0, newRegions, 0, regions.length);
936    newRegions[newRegions.length - 1] = regionIndex;
937    return newRegions;
938  }
939
940  int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
941    // Calculate the size of the new regions array
942    int newSize = regions.length - regionIndicesToRemove.size();
943    if (newSize < 0) {
944      throw new IllegalStateException(
945        "Region indices mismatch: more regions to remove than in the regions array");
946    }
947
948    int[] newRegions = new int[newSize];
949    int newIndex = 0;
950
951    // Copy only the regions not in the removal set
952    for (int region : regions) {
953      if (!regionIndicesToRemove.contains(region)) {
954        newRegions[newIndex++] = region;
955      }
956    }
957
958    // If the newIndex is smaller than newSize, some regions were missing from the input array
959    if (newIndex != newSize) {
960      throw new IllegalStateException("Region indices mismatch: some regions in the removal "
961        + "set were not found in the regions array");
962    }
963
964    return newRegions;
965  }
966
967  int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
968    int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
969
970    // Copy the existing regions to the new array
971    System.arraycopy(regions, 0, newRegions, 0, regions.length);
972
973    // Add the new regions at the end of the array
974    int newIndex = regions.length;
975    for (int regionIndex : regionIndicesToAdd) {
976      newRegions[newIndex++] = regionIndex;
977    }
978
979    return newRegions;
980  }
981
982  List<Integer> getShuffledServerIndices() {
983    return shuffledServerIndicesSupplier.get();
984  }
985
986  int[] addRegionSorted(int[] regions, int regionIndex) {
987    int[] newRegions = new int[regions.length + 1];
988    int i = 0;
989    for (i = 0; i < regions.length; i++) { // find the index to insert
990      if (regions[i] > regionIndex) {
991        break;
992      }
993    }
994    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
995    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
996    newRegions[i] = regionIndex;
997
998    return newRegions;
999  }
1000
1001  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
1002    int i = 0;
1003    for (i = 0; i < regions.length; i++) {
1004      if (regions[i] == regionIndex) {
1005        regions[i] = newRegionIndex;
1006        break;
1007      }
1008    }
1009    return regions;
1010  }
1011
1012  void sortServersByRegionCount() {
1013    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
1014  }
1015
1016  int getNumRegions(int server) {
1017    return regionsPerServer[server].length;
1018  }
1019
1020  boolean contains(int[] arr, int val) {
1021    return Arrays.binarySearch(arr, val) >= 0;
1022  }
1023
1024  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
1025
1026  public Comparator<Integer> getNumRegionsComparator() {
1027    return numRegionsComparator;
1028  }
1029
1030  int getLowestLocalityRegionOnServer(int serverIndex) {
1031    if (regionFinder != null) {
1032      float lowestLocality = 1.0f;
1033      int lowestLocalityRegionIndex = -1;
1034      if (regionsPerServer[serverIndex].length == 0) {
1035        // No regions on that region server
1036        return -1;
1037      }
1038      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
1039        int regionIndex = regionsPerServer[serverIndex][j];
1040        HDFSBlocksDistribution distribution =
1041          regionFinder.getBlockDistribution(regions[regionIndex]);
1042        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
1043        // skip empty region
1044        if (distribution.getUniqueBlocksTotalWeight() == 0) {
1045          continue;
1046        }
1047        if (locality < lowestLocality) {
1048          lowestLocality = locality;
1049          lowestLocalityRegionIndex = j;
1050        }
1051      }
1052      if (lowestLocalityRegionIndex == -1) {
1053        return -1;
1054      }
1055      if (LOG.isTraceEnabled()) {
1056        LOG.trace("Lowest locality region is "
1057          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
1058            .getRegionNameAsString()
1059          + " with locality " + lowestLocality + " and its region server contains "
1060          + regionsPerServer[serverIndex].length + " regions");
1061      }
1062      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
1063    } else {
1064      return -1;
1065    }
1066  }
1067
1068  float getLocalityOfRegion(int region, int server) {
1069    if (regionFinder != null) {
1070      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
1071      return distribution.getBlockLocalityIndex(servers[server].getHostname());
1072    } else {
1073      return 0f;
1074    }
1075  }
1076
1077  void setNumRegions(int numRegions) {
1078    this.numRegions = numRegions;
1079  }
1080
1081  void setNumMovedRegions(int numMovedRegions) {
1082    this.numMovedRegions = numMovedRegions;
1083  }
1084
1085  public int getMaxReplicas() {
1086    return maxReplicas;
1087  }
1088
1089  void setStopRequestedAt(long stopRequestedAt) {
1090    this.stopRequestedAt = stopRequestedAt;
1091  }
1092
1093  boolean isStopRequested() {
1094    return EnvironmentEdgeManager.currentTime() > stopRequestedAt;
1095  }
1096
1097  @Override
1098  public String toString() {
1099    StringBuilder desc = new StringBuilder("Cluster={servers=[");
1100    for (ServerName sn : servers) {
1101      desc.append(sn.getAddress().toString()).append(", ");
1102    }
1103    desc.append("], serverIndicesSortedByRegionCount=")
1104      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
1105      .append(Arrays.deepToString(regionsPerServer));
1106
1107    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1108      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
1109      .append('}');
1110    return desc.toString();
1111  }
1112}