001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import org.agrona.collections.Hashing;
030import org.agrona.collections.Int2IntCounterMap;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.master.RackManager;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * An efficient array based implementation similar to ClusterState for keeping the status of the
043 * cluster in terms of region assignment and distribution. LoadBalancers, such as
044 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
045 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
046 * <p/>
047 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
048 * topology in terms of server names, hostnames and racks.
049 */
050@InterfaceAudience.Private
051class BalancerClusterState {
052
053  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
054
055  ServerName[] servers;
056  // ServerName uniquely identifies a region server. multiple RS can run on the same host
057  String[] hosts;
058  String[] racks;
059  boolean multiServersPerHost = false; // whether or not any host has more than one server
060
061  ArrayList<String> tables;
062  RegionInfo[] regions;
063  Deque<BalancerRegionLoad>[] regionLoads;
064  private RegionHDFSBlockLocationFinder regionFinder;
065
066  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
067
068  int[] serverIndexToHostIndex; // serverIndex -> host index
069  int[] serverIndexToRackIndex; // serverIndex -> rack index
070
071  int[][] regionsPerServer; // serverIndex -> region list
072  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
073  int[][] regionsPerHost; // hostIndex -> list of regions
074  int[][] regionsPerRack; // rackIndex -> region list
075  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
076  // replicas by primary region index
077  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
078  // primary region index
079  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
080  // primary region index
081
082  int[][] serversPerHost; // hostIndex -> list of server indexes
083  int[][] serversPerRack; // rackIndex -> list of server indexes
084  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
085  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
086  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
087  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
088  int[] numRegionsPerTable; // tableIndex -> region count
089  double[] meanRegionsPerTable; // mean region count per table
090  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
091  boolean hasRegionReplicas = false; // whether there is regions with replicas
092
093  Integer[] serverIndicesSortedByRegionCount;
094  Integer[] serverIndicesSortedByLocality;
095
096  Map<Address, Integer> serversToIndex;
097  Map<String, Integer> hostsToIndex;
098  Map<String, Integer> racksToIndex;
099  Map<String, Integer> tablesToIndex;
100  Map<RegionInfo, Integer> regionsToIndex;
101  float[] localityPerServer;
102
103  int numServers;
104  int numHosts;
105  int numRacks;
106  int numTables;
107  int numRegions;
108
109  int numMovedRegions = 0; // num moved regions from the initial configuration
110  Map<ServerName, List<RegionInfo>> clusterState;
111
112  private final RackManager rackManager;
113  // Maps region -> rackIndex -> locality of region on rack
114  private float[][] rackLocalities;
115  // Maps localityType -> region -> [server|rack]Index with highest locality
116  private int[][] regionsToMostLocalEntities;
117
118  static class DefaultRackManager extends RackManager {
119    @Override
120    public String getRack(ServerName server) {
121      return UNKNOWN_RACK;
122    }
123  }
124
125  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
126    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
127    RackManager rackManager) {
128    this(null, clusterState, loads, regionFinder, rackManager);
129  }
130
131  @SuppressWarnings("unchecked")
132  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
133    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
134    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
135    if (unassignedRegions == null) {
136      unassignedRegions = Collections.emptyList();
137    }
138
139    serversToIndex = new HashMap<>();
140    hostsToIndex = new HashMap<>();
141    racksToIndex = new HashMap<>();
142    tablesToIndex = new HashMap<>();
143
144    // TODO: We should get the list of tables from master
145    tables = new ArrayList<>();
146    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
147
148    numRegions = 0;
149
150    List<List<Integer>> serversPerHostList = new ArrayList<>();
151    List<List<Integer>> serversPerRackList = new ArrayList<>();
152    this.clusterState = clusterState;
153    this.regionFinder = regionFinder;
154
155    // Use servername and port as there can be dead servers in this list. We want everything with
156    // a matching hostname and port to have the same index.
157    for (ServerName sn : clusterState.keySet()) {
158      if (sn == null) {
159        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
160          + "skipping; unassigned regions?");
161        if (LOG.isTraceEnabled()) {
162          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
163        }
164        continue;
165      }
166      if (serversToIndex.get(sn.getAddress()) == null) {
167        serversToIndex.put(sn.getAddress(), numServers++);
168      }
169      if (!hostsToIndex.containsKey(sn.getHostname())) {
170        hostsToIndex.put(sn.getHostname(), numHosts++);
171        serversPerHostList.add(new ArrayList<>(1));
172      }
173
174      int serverIndex = serversToIndex.get(sn.getAddress());
175      int hostIndex = hostsToIndex.get(sn.getHostname());
176      serversPerHostList.get(hostIndex).add(serverIndex);
177
178      String rack = this.rackManager.getRack(sn);
179
180      if (!racksToIndex.containsKey(rack)) {
181        racksToIndex.put(rack, numRacks++);
182        serversPerRackList.add(new ArrayList<>());
183      }
184      int rackIndex = racksToIndex.get(rack);
185      serversPerRackList.get(rackIndex).add(serverIndex);
186    }
187
188    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
189    // Count how many regions there are.
190    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
191      numRegions += entry.getValue().size();
192    }
193    numRegions += unassignedRegions.size();
194
195    regionsToIndex = new HashMap<>(numRegions);
196    servers = new ServerName[numServers];
197    serversPerHost = new int[numHosts][];
198    serversPerRack = new int[numRacks][];
199    regions = new RegionInfo[numRegions];
200    regionIndexToServerIndex = new int[numRegions];
201    initialRegionIndexToServerIndex = new int[numRegions];
202    regionIndexToTableIndex = new int[numRegions];
203    regionIndexToPrimaryIndex = new int[numRegions];
204    regionLoads = new Deque[numRegions];
205
206    regionLocations = new int[numRegions][];
207    serverIndicesSortedByRegionCount = new Integer[numServers];
208    serverIndicesSortedByLocality = new Integer[numServers];
209    localityPerServer = new float[numServers];
210
211    serverIndexToHostIndex = new int[numServers];
212    serverIndexToRackIndex = new int[numServers];
213    regionsPerServer = new int[numServers][];
214    serverIndexToRegionsOffset = new int[numServers];
215    regionsPerHost = new int[numHosts][];
216    regionsPerRack = new int[numRacks][];
217    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
218    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
219    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
220
221    int regionIndex = 0, regionPerServerIndex = 0;
222
223    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
224      if (entry.getKey() == null) {
225        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
226        continue;
227      }
228      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
229
230      // keep the servername if this is the first server name for this hostname
231      // or this servername has the newest startcode.
232      if (
233        servers[serverIndex] == null
234          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
235      ) {
236        servers[serverIndex] = entry.getKey();
237      }
238
239      if (regionsPerServer[serverIndex] != null) {
240        // there is another server with the same hostAndPort in ClusterState.
241        // allocate the array for the total size
242        regionsPerServer[serverIndex] =
243          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
244      } else {
245        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
246      }
247      colocatedReplicaCountsPerServer[serverIndex] =
248        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
249      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
250      serverIndicesSortedByLocality[serverIndex] = serverIndex;
251    }
252
253    hosts = new String[numHosts];
254    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
255      hosts[entry.getValue()] = entry.getKey();
256    }
257    racks = new String[numRacks];
258    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
259      racks[entry.getValue()] = entry.getKey();
260    }
261
262    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
263      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
264      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
265
266      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
267      serverIndexToHostIndex[serverIndex] = hostIndex;
268
269      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
270      serverIndexToRackIndex[serverIndex] = rackIndex;
271
272      for (RegionInfo region : entry.getValue()) {
273        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
274        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
275        regionIndex++;
276      }
277      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
278    }
279
280    for (RegionInfo region : unassignedRegions) {
281      registerRegion(region, regionIndex, -1, loads, regionFinder);
282      regionIndex++;
283    }
284
285    if (LOG.isDebugEnabled()) {
286      for (int i = 0; i < numServers; i++) {
287        LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
288      }
289    }
290    for (int i = 0; i < serversPerHostList.size(); i++) {
291      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
292      for (int j = 0; j < serversPerHost[i].length; j++) {
293        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
294        LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
295      }
296      if (serversPerHost[i].length > 1) {
297        multiServersPerHost = true;
298      }
299    }
300
301    for (int i = 0; i < serversPerRackList.size(); i++) {
302      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
303      for (int j = 0; j < serversPerRack[i].length; j++) {
304        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
305        LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
306      }
307    }
308
309    numTables = tables.size();
310    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
311      numRacks);
312    numRegionsPerServerPerTable = new int[numTables][numServers];
313    numRegionsPerTable = new int[numTables];
314
315    for (int i = 0; i < numTables; i++) {
316      for (int j = 0; j < numServers; j++) {
317        numRegionsPerServerPerTable[i][j] = 0;
318      }
319    }
320
321    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
322      if (regionIndexToServerIndex[i] >= 0) {
323        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
324        numRegionsPerTable[regionIndexToTableIndex[i]]++;
325      }
326    }
327
328    // Avoid repeated computation for planning
329    meanRegionsPerTable = new double[numTables];
330
331    for (int i = 0; i < numTables; i++) {
332      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
333    }
334
335    for (int i = 0; i < regions.length; i++) {
336      RegionInfo info = regions[i];
337      if (RegionReplicaUtil.isDefaultReplica(info)) {
338        regionIndexToPrimaryIndex[i] = i;
339      } else {
340        hasRegionReplicas = true;
341        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
342        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
343      }
344    }
345
346    for (int i = 0; i < regionsPerServer.length; i++) {
347      colocatedReplicaCountsPerServer[i] =
348        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
349      for (int j = 0; j < regionsPerServer[i].length; j++) {
350        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
351        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
352      }
353    }
354    // compute regionsPerHost
355    if (multiServersPerHost) {
356      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
357        serversPerHost);
358    }
359
360    // compute regionsPerRack
361    if (numRacks > 1) {
362      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
363        serversPerRack);
364    }
365  }
366
367  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
368    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
369    for (int i = 0; i < serversPerLocation.length; i++) {
370      int numRegionsPerLocation = 0;
371      for (int j = 0; j < serversPerLocation[i].length; j++) {
372        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
373      }
374      regionsPerLocation[i] = new int[numRegionsPerLocation];
375      colocatedReplicaCountsPerLocation[i] =
376        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
377    }
378
379    for (int i = 0; i < serversPerLocation.length; i++) {
380      int numRegionPerLocationIndex = 0;
381      for (int j = 0; j < serversPerLocation[i].length; j++) {
382        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
383          int region = regionsPerServer[serversPerLocation[i][j]][k];
384          regionsPerLocation[i][numRegionPerLocationIndex] = region;
385          int primaryIndex = regionIndexToPrimaryIndex[region];
386          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
387          numRegionPerLocationIndex++;
388        }
389      }
390    }
391
392  }
393
394  /** Helper for Cluster constructor to handle a region */
395  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
396    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
397    String tableName = region.getTable().getNameAsString();
398    if (!tablesToIndex.containsKey(tableName)) {
399      tables.add(tableName);
400      tablesToIndex.put(tableName, tablesToIndex.size());
401    }
402    int tableIndex = tablesToIndex.get(tableName);
403
404    regionsToIndex.put(region, regionIndex);
405    regions[regionIndex] = region;
406    regionIndexToServerIndex[regionIndex] = serverIndex;
407    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
408    regionIndexToTableIndex[regionIndex] = tableIndex;
409
410    // region load
411    if (loads != null) {
412      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
413      // That could have failed if the RegionLoad is using the other regionName
414      if (rl == null) {
415        // Try getting the region load using encoded name.
416        rl = loads.get(region.getEncodedName());
417      }
418      regionLoads[regionIndex] = rl;
419    }
420
421    if (regionFinder != null) {
422      // region location
423      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
424      regionLocations[regionIndex] = new int[loc.size()];
425      for (int i = 0; i < loc.size(); i++) {
426        regionLocations[regionIndex][i] = loc.get(i) == null
427          ? -1
428          : (serversToIndex.get(loc.get(i).getAddress()) == null
429            ? -1
430            : serversToIndex.get(loc.get(i).getAddress()));
431      }
432    }
433  }
434
435  /**
436   * Returns true iff a given server has less regions than the balanced amount
437   */
438  public boolean serverHasTooFewRegions(int server) {
439    int minLoad = this.numRegions / numServers;
440    int numRegions = getNumRegions(server);
441    return numRegions < minLoad;
442  }
443
444  /**
445   * Retrieves and lazily initializes a field storing the locality of every region/server
446   * combination
447   */
448  public float[][] getOrComputeRackLocalities() {
449    if (rackLocalities == null || regionsToMostLocalEntities == null) {
450      computeCachedLocalities();
451    }
452    return rackLocalities;
453  }
454
455  /**
456   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
457   * the locality
458   */
459  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
460    if (rackLocalities == null || regionsToMostLocalEntities == null) {
461      computeCachedLocalities();
462    }
463    return regionsToMostLocalEntities[type.ordinal()];
464  }
465
466  /**
467   * Looks up locality from cache of localities. Will create cache if it does not already exist.
468   */
469  public float getOrComputeLocality(int region, int entity,
470    BalancerClusterState.LocalityType type) {
471    switch (type) {
472      case SERVER:
473        return getLocalityOfRegion(region, entity);
474      case RACK:
475        return getOrComputeRackLocalities()[region][entity];
476      default:
477        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
478    }
479  }
480
481  /**
482   * Returns locality weighted by region size in MB. Will create locality cache if it does not
483   * already exist.
484   */
485  public double getOrComputeWeightedLocality(int region, int server,
486    BalancerClusterState.LocalityType type) {
487    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
488  }
489
490  /**
491   * Returns the size in MB from the most recent RegionLoad for region
492   */
493  public int getRegionSizeMB(int region) {
494    Deque<BalancerRegionLoad> load = regionLoads[region];
495    // This means regions have no actual data on disk
496    if (load == null) {
497      return 0;
498    }
499    return regionLoads[region].getLast().getStorefileSizeMB();
500  }
501
502  /**
503   * Computes and caches the locality for each region/rack combinations, as well as storing a
504   * mapping of region -> server and region -> rack such that server and rack have the highest
505   * locality for region
506   */
507  private void computeCachedLocalities() {
508    rackLocalities = new float[numRegions][numRacks];
509    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
510
511    // Compute localities and find most local server per region
512    for (int region = 0; region < numRegions; region++) {
513      int serverWithBestLocality = 0;
514      float bestLocalityForRegion = 0;
515      for (int server = 0; server < numServers; server++) {
516        // Aggregate per-rack locality
517        float locality = getLocalityOfRegion(region, server);
518        int rack = serverIndexToRackIndex[server];
519        int numServersInRack = serversPerRack[rack].length;
520        rackLocalities[region][rack] += locality / numServersInRack;
521
522        if (locality > bestLocalityForRegion) {
523          serverWithBestLocality = server;
524          bestLocalityForRegion = locality;
525        }
526      }
527      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
528
529      // Find most local rack per region
530      int rackWithBestLocality = 0;
531      float bestRackLocalityForRegion = 0.0f;
532      for (int rack = 0; rack < numRacks; rack++) {
533        float rackLocality = rackLocalities[region][rack];
534        if (rackLocality > bestRackLocalityForRegion) {
535          bestRackLocalityForRegion = rackLocality;
536          rackWithBestLocality = rack;
537        }
538      }
539      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
540    }
541
542  }
543
544  /**
545   * Maps region index to rack index
546   */
547  public int getRackForRegion(int region) {
548    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
549  }
550
551  enum LocalityType {
552    SERVER,
553    RACK
554  }
555
556  public void doAction(BalanceAction action) {
557    switch (action.getType()) {
558      case NULL:
559        break;
560      case ASSIGN_REGION:
561        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
562        assert action instanceof AssignRegionAction : action.getClass();
563        AssignRegionAction ar = (AssignRegionAction) action;
564        regionsPerServer[ar.getServer()] =
565          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
566        regionMoved(ar.getRegion(), -1, ar.getServer());
567        break;
568      case MOVE_REGION:
569        assert action instanceof MoveRegionAction : action.getClass();
570        MoveRegionAction mra = (MoveRegionAction) action;
571        regionsPerServer[mra.getFromServer()] =
572          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
573        regionsPerServer[mra.getToServer()] =
574          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
575        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
576        break;
577      case SWAP_REGIONS:
578        assert action instanceof SwapRegionsAction : action.getClass();
579        SwapRegionsAction a = (SwapRegionsAction) action;
580        regionsPerServer[a.getFromServer()] =
581          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
582        regionsPerServer[a.getToServer()] =
583          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
584        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
585        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
586        break;
587      default:
588        throw new RuntimeException("Uknown action:" + action.getType());
589    }
590  }
591
592  /**
593   * Return true if the placement of region on server would lower the availability of the region in
594   * question
595   * @return true or false
596   */
597  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
598    if (!serversToIndex.containsKey(serverName.getAddress())) {
599      return false; // safeguard against race between cluster.servers and servers from LB method
600      // args
601    }
602    int server = serversToIndex.get(serverName.getAddress());
603    int region = regionsToIndex.get(regionInfo);
604
605    // Region replicas for same region should better assign to different servers
606    for (int i : regionsPerServer[server]) {
607      RegionInfo otherRegionInfo = regions[i];
608      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
609        return true;
610      }
611    }
612
613    int primary = regionIndexToPrimaryIndex[region];
614    if (primary == -1) {
615      return false;
616    }
617    // there is a subset relation for server < host < rack
618    // check server first
619    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
620    if (result != 0) {
621      return result > 0;
622    }
623
624    // check host
625    if (multiServersPerHost) {
626      result = checkLocationForPrimary(serverIndexToHostIndex[server],
627        colocatedReplicaCountsPerHost, primary);
628      if (result != 0) {
629        return result > 0;
630      }
631    }
632
633    // check rack
634    if (numRacks > 1) {
635      result = checkLocationForPrimary(serverIndexToRackIndex[server],
636        colocatedReplicaCountsPerRack, primary);
637      if (result != 0) {
638        return result > 0;
639      }
640    }
641    return false;
642  }
643
644  /**
645   * Common method for better solution check.
646   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
647   *                                          colocatedReplicaCountsPerRack
648   * @return 1 for better, -1 for no better, 0 for unknown
649   */
650  private int checkLocationForPrimary(int location,
651    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
652    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
653      // check for whether there are other Locations that we can place this region
654      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
655        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
656          return 1; // meaning there is a better Location
657        }
658      }
659      return -1; // there is not a better Location to place this
660    }
661    return 0;
662  }
663
664  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
665    if (!serversToIndex.containsKey(serverName.getAddress())) {
666      return;
667    }
668    int server = serversToIndex.get(serverName.getAddress());
669    int region = regionsToIndex.get(regionInfo);
670    doAction(new AssignRegionAction(region, server));
671  }
672
673  void regionMoved(int region, int oldServer, int newServer) {
674    regionIndexToServerIndex[region] = newServer;
675    if (initialRegionIndexToServerIndex[region] == newServer) {
676      numMovedRegions--; // region moved back to original location
677    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
678      numMovedRegions++; // region moved from original location
679    }
680    int tableIndex = regionIndexToTableIndex[region];
681    if (oldServer >= 0) {
682      numRegionsPerServerPerTable[tableIndex][oldServer]--;
683    }
684    numRegionsPerServerPerTable[tableIndex][newServer]++;
685
686    // update for servers
687    int primary = regionIndexToPrimaryIndex[region];
688    if (oldServer >= 0) {
689      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
690    }
691    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
692
693    // update for hosts
694    if (multiServersPerHost) {
695      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
696        oldServer, newServer, primary, region);
697    }
698
699    // update for racks
700    if (numRacks > 1) {
701      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
702        oldServer, newServer, primary, region);
703    }
704  }
705
706  /**
707   * Common method for per host and per Location region index updates when a region is moved.
708   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
709   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
710   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
711   *                                          colocatedReplicaCountsPerRack
712   */
713  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
714    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
715    int primary, int region) {
716    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
717    int newLocation = serverIndexToLocation[newServer];
718    if (newLocation != oldLocation) {
719      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
720      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
721      if (oldLocation >= 0) {
722        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
723        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
724      }
725    }
726
727  }
728
729  int[] removeRegion(int[] regions, int regionIndex) {
730    // TODO: this maybe costly. Consider using linked lists
731    int[] newRegions = new int[regions.length - 1];
732    int i = 0;
733    for (i = 0; i < regions.length; i++) {
734      if (regions[i] == regionIndex) {
735        break;
736      }
737      newRegions[i] = regions[i];
738    }
739    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
740    return newRegions;
741  }
742
743  int[] addRegion(int[] regions, int regionIndex) {
744    int[] newRegions = new int[regions.length + 1];
745    System.arraycopy(regions, 0, newRegions, 0, regions.length);
746    newRegions[newRegions.length - 1] = regionIndex;
747    return newRegions;
748  }
749
750  int[] addRegionSorted(int[] regions, int regionIndex) {
751    int[] newRegions = new int[regions.length + 1];
752    int i = 0;
753    for (i = 0; i < regions.length; i++) { // find the index to insert
754      if (regions[i] > regionIndex) {
755        break;
756      }
757    }
758    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
759    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
760    newRegions[i] = regionIndex;
761
762    return newRegions;
763  }
764
765  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
766    int i = 0;
767    for (i = 0; i < regions.length; i++) {
768      if (regions[i] == regionIndex) {
769        regions[i] = newRegionIndex;
770        break;
771      }
772    }
773    return regions;
774  }
775
776  void sortServersByRegionCount() {
777    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
778  }
779
780  int getNumRegions(int server) {
781    return regionsPerServer[server].length;
782  }
783
784  boolean contains(int[] arr, int val) {
785    return Arrays.binarySearch(arr, val) >= 0;
786  }
787
788  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
789
790  public Comparator<Integer> getNumRegionsComparator() {
791    return numRegionsComparator;
792  }
793
794  int getLowestLocalityRegionOnServer(int serverIndex) {
795    if (regionFinder != null) {
796      float lowestLocality = 1.0f;
797      int lowestLocalityRegionIndex = -1;
798      if (regionsPerServer[serverIndex].length == 0) {
799        // No regions on that region server
800        return -1;
801      }
802      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
803        int regionIndex = regionsPerServer[serverIndex][j];
804        HDFSBlocksDistribution distribution =
805          regionFinder.getBlockDistribution(regions[regionIndex]);
806        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
807        // skip empty region
808        if (distribution.getUniqueBlocksTotalWeight() == 0) {
809          continue;
810        }
811        if (locality < lowestLocality) {
812          lowestLocality = locality;
813          lowestLocalityRegionIndex = j;
814        }
815      }
816      if (lowestLocalityRegionIndex == -1) {
817        return -1;
818      }
819      if (LOG.isTraceEnabled()) {
820        LOG.trace("Lowest locality region is "
821          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
822            .getRegionNameAsString()
823          + " with locality " + lowestLocality + " and its region server contains "
824          + regionsPerServer[serverIndex].length + " regions");
825      }
826      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
827    } else {
828      return -1;
829    }
830  }
831
832  float getLocalityOfRegion(int region, int server) {
833    if (regionFinder != null) {
834      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
835      return distribution.getBlockLocalityIndex(servers[server].getHostname());
836    } else {
837      return 0f;
838    }
839  }
840
841  void setNumRegions(int numRegions) {
842    this.numRegions = numRegions;
843  }
844
845  void setNumMovedRegions(int numMovedRegions) {
846    this.numMovedRegions = numMovedRegions;
847  }
848
849  @Override
850  public String toString() {
851    StringBuilder desc = new StringBuilder("Cluster={servers=[");
852    for (ServerName sn : servers) {
853      desc.append(sn.getAddress().toString()).append(", ");
854    }
855    desc.append("], serverIndicesSortedByRegionCount=")
856      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
857      .append(Arrays.deepToString(regionsPerServer));
858
859    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
860      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
861      .append('}');
862    return desc.toString();
863  }
864}