001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import org.agrona.collections.Hashing;
030import org.agrona.collections.Int2IntCounterMap;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.master.RackManager;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * An efficient array based implementation similar to ClusterState for keeping the status of the
043 * cluster in terms of region assignment and distribution. LoadBalancers, such as
044 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
045 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
046 * <p/>
047 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
048 * topology in terms of server names, hostnames and racks.
049 */
050@InterfaceAudience.Private
051class BalancerClusterState {
052
053  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
054
055  ServerName[] servers;
056  // ServerName uniquely identifies a region server. multiple RS can run on the same host
057  String[] hosts;
058  String[] racks;
059  boolean multiServersPerHost = false; // whether or not any host has more than one server
060
061  ArrayList<String> tables;
062  RegionInfo[] regions;
063  Deque<BalancerRegionLoad>[] regionLoads;
064  private RegionLocationFinder regionFinder;
065
066  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
067
068  int[] serverIndexToHostIndex; // serverIndex -> host index
069  int[] serverIndexToRackIndex; // serverIndex -> rack index
070
071  int[][] regionsPerServer; // serverIndex -> region list
072  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
073  int[][] regionsPerHost; // hostIndex -> list of regions
074  int[][] regionsPerRack; // rackIndex -> region list
075  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
076  // replicas by primary region index
077  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
078  // primary region index
079  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
080  // primary region index
081
082  int[][] serversPerHost; // hostIndex -> list of server indexes
083  int[][] serversPerRack; // rackIndex -> list of server indexes
084  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
085  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
086  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
087  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions
088  int[] numRegionsPerTable; // tableIndex -> region count
089  int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
090  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
091  boolean hasRegionReplicas = false; // whether there is regions with replicas
092
093  Integer[] serverIndicesSortedByRegionCount;
094  Integer[] serverIndicesSortedByLocality;
095
096  Map<Address, Integer> serversToIndex;
097  Map<String, Integer> hostsToIndex;
098  Map<String, Integer> racksToIndex;
099  Map<String, Integer> tablesToIndex;
100  Map<RegionInfo, Integer> regionsToIndex;
101  float[] localityPerServer;
102
103  int numServers;
104  int numHosts;
105  int numRacks;
106  int numTables;
107  int numRegions;
108
109  int numMovedRegions = 0; // num moved regions from the initial configuration
110  Map<ServerName, List<RegionInfo>> clusterState;
111
112  private final RackManager rackManager;
113  // Maps region -> rackIndex -> locality of region on rack
114  private float[][] rackLocalities;
115  // Maps localityType -> region -> [server|rack]Index with highest locality
116  private int[][] regionsToMostLocalEntities;
117
118  static class DefaultRackManager extends RackManager {
119    @Override
120    public String getRack(ServerName server) {
121      return UNKNOWN_RACK;
122    }
123  }
124
125  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
126    Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
127    RackManager rackManager) {
128    this(null, clusterState, loads, regionFinder, rackManager);
129  }
130
131  @SuppressWarnings("unchecked")
132  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
133    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
134    RegionLocationFinder regionFinder, RackManager rackManager) {
135    if (unassignedRegions == null) {
136      unassignedRegions = Collections.emptyList();
137    }
138
139    serversToIndex = new HashMap<>();
140    hostsToIndex = new HashMap<>();
141    racksToIndex = new HashMap<>();
142    tablesToIndex = new HashMap<>();
143
144    // TODO: We should get the list of tables from master
145    tables = new ArrayList<>();
146    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
147
148    numRegions = 0;
149
150    List<List<Integer>> serversPerHostList = new ArrayList<>();
151    List<List<Integer>> serversPerRackList = new ArrayList<>();
152    this.clusterState = clusterState;
153    this.regionFinder = regionFinder;
154
155    // Use servername and port as there can be dead servers in this list. We want everything with
156    // a matching hostname and port to have the same index.
157    for (ServerName sn : clusterState.keySet()) {
158      if (sn == null) {
159        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
160          + "skipping; unassigned regions?");
161        if (LOG.isTraceEnabled()) {
162          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
163        }
164        continue;
165      }
166      if (serversToIndex.get(sn.getAddress()) == null) {
167        serversToIndex.put(sn.getAddress(), numServers++);
168      }
169      if (!hostsToIndex.containsKey(sn.getHostname())) {
170        hostsToIndex.put(sn.getHostname(), numHosts++);
171        serversPerHostList.add(new ArrayList<>(1));
172      }
173
174      int serverIndex = serversToIndex.get(sn.getAddress());
175      int hostIndex = hostsToIndex.get(sn.getHostname());
176      serversPerHostList.get(hostIndex).add(serverIndex);
177
178      String rack = this.rackManager.getRack(sn);
179      if (!racksToIndex.containsKey(rack)) {
180        racksToIndex.put(rack, numRacks++);
181        serversPerRackList.add(new ArrayList<>());
182      }
183      int rackIndex = racksToIndex.get(rack);
184      serversPerRackList.get(rackIndex).add(serverIndex);
185    }
186    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
187    // Count how many regions there are.
188    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
189      numRegions += entry.getValue().size();
190    }
191    numRegions += unassignedRegions.size();
192
193    regionsToIndex = new HashMap<>(numRegions);
194    servers = new ServerName[numServers];
195    serversPerHost = new int[numHosts][];
196    serversPerRack = new int[numRacks][];
197    regions = new RegionInfo[numRegions];
198    regionIndexToServerIndex = new int[numRegions];
199    initialRegionIndexToServerIndex = new int[numRegions];
200    regionIndexToTableIndex = new int[numRegions];
201    regionIndexToPrimaryIndex = new int[numRegions];
202    regionLoads = new Deque[numRegions];
203
204    regionLocations = new int[numRegions][];
205    serverIndicesSortedByRegionCount = new Integer[numServers];
206    serverIndicesSortedByLocality = new Integer[numServers];
207    localityPerServer = new float[numServers];
208
209    serverIndexToHostIndex = new int[numServers];
210    serverIndexToRackIndex = new int[numServers];
211    regionsPerServer = new int[numServers][];
212    serverIndexToRegionsOffset = new int[numServers];
213    regionsPerHost = new int[numHosts][];
214    regionsPerRack = new int[numRacks][];
215    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
216    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
217    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
218
219    int regionIndex = 0, regionPerServerIndex = 0;
220
221    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
222      if (entry.getKey() == null) {
223        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
224        continue;
225      }
226      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
227
228      // keep the servername if this is the first server name for this hostname
229      // or this servername has the newest startcode.
230      if (
231        servers[serverIndex] == null
232          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
233      ) {
234        servers[serverIndex] = entry.getKey();
235      }
236
237      if (regionsPerServer[serverIndex] != null) {
238        // there is another server with the same hostAndPort in ClusterState.
239        // allocate the array for the total size
240        regionsPerServer[serverIndex] =
241          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
242      } else {
243        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
244      }
245      colocatedReplicaCountsPerServer[serverIndex] =
246        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
247      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
248      serverIndicesSortedByLocality[serverIndex] = serverIndex;
249    }
250
251    hosts = new String[numHosts];
252    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
253      hosts[entry.getValue()] = entry.getKey();
254    }
255    racks = new String[numRacks];
256    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
257      racks[entry.getValue()] = entry.getKey();
258    }
259
260    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
261      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
262      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
263
264      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
265      serverIndexToHostIndex[serverIndex] = hostIndex;
266
267      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
268      serverIndexToRackIndex[serverIndex] = rackIndex;
269
270      for (RegionInfo region : entry.getValue()) {
271        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
272        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
273        regionIndex++;
274      }
275      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
276    }
277
278    for (RegionInfo region : unassignedRegions) {
279      registerRegion(region, regionIndex, -1, loads, regionFinder);
280      regionIndex++;
281    }
282
283    for (int i = 0; i < serversPerHostList.size(); i++) {
284      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
285      for (int j = 0; j < serversPerHost[i].length; j++) {
286        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
287        LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
288      }
289      if (serversPerHost[i].length > 1) {
290        multiServersPerHost = true;
291      }
292    }
293
294    for (int i = 0; i < serversPerRackList.size(); i++) {
295      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
296      for (int j = 0; j < serversPerRack[i].length; j++) {
297        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
298        LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
299      }
300    }
301
302    numTables = tables.size();
303    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
304      numRacks);
305    numRegionsPerServerPerTable = new int[numTables][numServers];
306    numRegionsPerTable = new int[numTables];
307
308    for (int i = 0; i < numTables; i++) {
309      for (int j = 0; j < numServers; j++) {
310        numRegionsPerServerPerTable[i][j] = 0;
311      }
312    }
313
314    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
315      if (regionIndexToServerIndex[i] >= 0) {
316        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
317        numRegionsPerTable[regionIndexToTableIndex[i]]++;
318      }
319    }
320
321    for (int i = 0; i < regions.length; i++) {
322      RegionInfo info = regions[i];
323      if (RegionReplicaUtil.isDefaultReplica(info)) {
324        regionIndexToPrimaryIndex[i] = i;
325      } else {
326        hasRegionReplicas = true;
327        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
328        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
329      }
330    }
331
332    for (int i = 0; i < regionsPerServer.length; i++) {
333      colocatedReplicaCountsPerServer[i] =
334        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
335      for (int j = 0; j < regionsPerServer[i].length; j++) {
336        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
337        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
338      }
339    }
340    // compute regionsPerHost
341    if (multiServersPerHost) {
342      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
343        serversPerHost);
344    }
345
346    // compute regionsPerRack
347    if (numRacks > 1) {
348      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
349        serversPerRack);
350    }
351  }
352
353  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
354    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
355    for (int i = 0; i < serversPerLocation.length; i++) {
356      int numRegionsPerLocation = 0;
357      for (int j = 0; j < serversPerLocation[i].length; j++) {
358        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
359      }
360      regionsPerLocation[i] = new int[numRegionsPerLocation];
361      colocatedReplicaCountsPerLocation[i] =
362        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
363    }
364
365    for (int i = 0; i < serversPerLocation.length; i++) {
366      int numRegionPerLocationIndex = 0;
367      for (int j = 0; j < serversPerLocation[i].length; j++) {
368        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
369          int region = regionsPerServer[serversPerLocation[i][j]][k];
370          regionsPerLocation[i][numRegionPerLocationIndex] = region;
371          int primaryIndex = regionIndexToPrimaryIndex[region];
372          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
373          numRegionPerLocationIndex++;
374        }
375      }
376    }
377
378  }
379
380  /** Helper for Cluster constructor to handle a region */
381  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
382    Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder) {
383    String tableName = region.getTable().getNameAsString();
384    if (!tablesToIndex.containsKey(tableName)) {
385      tables.add(tableName);
386      tablesToIndex.put(tableName, tablesToIndex.size());
387    }
388    int tableIndex = tablesToIndex.get(tableName);
389
390    regionsToIndex.put(region, regionIndex);
391    regions[regionIndex] = region;
392    regionIndexToServerIndex[regionIndex] = serverIndex;
393    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
394    regionIndexToTableIndex[regionIndex] = tableIndex;
395
396    // region load
397    if (loads != null) {
398      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
399      // That could have failed if the RegionLoad is using the other regionName
400      if (rl == null) {
401        // Try getting the region load using encoded name.
402        rl = loads.get(region.getEncodedName());
403      }
404      regionLoads[regionIndex] = rl;
405    }
406
407    if (regionFinder != null) {
408      // region location
409      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
410      regionLocations[regionIndex] = new int[loc.size()];
411      for (int i = 0; i < loc.size(); i++) {
412        regionLocations[regionIndex][i] = loc.get(i) == null
413          ? -1
414          : (serversToIndex.get(loc.get(i).getAddress()) == null
415            ? -1
416            : serversToIndex.get(loc.get(i).getAddress()));
417      }
418    }
419  }
420
421  /**
422   * Returns true iff a given server has less regions than the balanced amount
423   */
424  public boolean serverHasTooFewRegions(int server) {
425    int minLoad = this.numRegions / numServers;
426    int numRegions = getNumRegions(server);
427    return numRegions < minLoad;
428  }
429
430  /**
431   * Retrieves and lazily initializes a field storing the locality of every region/server
432   * combination
433   */
434  public float[][] getOrComputeRackLocalities() {
435    if (rackLocalities == null || regionsToMostLocalEntities == null) {
436      computeCachedLocalities();
437    }
438    return rackLocalities;
439  }
440
441  /**
442   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
443   * the locality
444   */
445  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
446    if (rackLocalities == null || regionsToMostLocalEntities == null) {
447      computeCachedLocalities();
448    }
449    return regionsToMostLocalEntities[type.ordinal()];
450  }
451
452  /**
453   * Looks up locality from cache of localities. Will create cache if it does not already exist.
454   */
455  public float getOrComputeLocality(int region, int entity,
456    BalancerClusterState.LocalityType type) {
457    switch (type) {
458      case SERVER:
459        return getLocalityOfRegion(region, entity);
460      case RACK:
461        return getOrComputeRackLocalities()[region][entity];
462      default:
463        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
464    }
465  }
466
467  /**
468   * Returns locality weighted by region size in MB. Will create locality cache if it does not
469   * already exist.
470   */
471  public double getOrComputeWeightedLocality(int region, int server,
472    BalancerClusterState.LocalityType type) {
473    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
474  }
475
476  /**
477   * Returns the size in MB from the most recent RegionLoad for region
478   */
479  public int getRegionSizeMB(int region) {
480    Deque<BalancerRegionLoad> load = regionLoads[region];
481    // This means regions have no actual data on disk
482    if (load == null) {
483      return 0;
484    }
485    return regionLoads[region].getLast().getStorefileSizeMB();
486  }
487
488  /**
489   * Computes and caches the locality for each region/rack combinations, as well as storing a
490   * mapping of region -> server and region -> rack such that server and rack have the highest
491   * locality for region
492   */
493  private void computeCachedLocalities() {
494    rackLocalities = new float[numRegions][numRacks];
495    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
496
497    // Compute localities and find most local server per region
498    for (int region = 0; region < numRegions; region++) {
499      int serverWithBestLocality = 0;
500      float bestLocalityForRegion = 0;
501      for (int server = 0; server < numServers; server++) {
502        // Aggregate per-rack locality
503        float locality = getLocalityOfRegion(region, server);
504        int rack = serverIndexToRackIndex[server];
505        int numServersInRack = serversPerRack[rack].length;
506        rackLocalities[region][rack] += locality / numServersInRack;
507
508        if (locality > bestLocalityForRegion) {
509          serverWithBestLocality = server;
510          bestLocalityForRegion = locality;
511        }
512      }
513      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
514
515      // Find most local rack per region
516      int rackWithBestLocality = 0;
517      float bestRackLocalityForRegion = 0.0f;
518      for (int rack = 0; rack < numRacks; rack++) {
519        float rackLocality = rackLocalities[region][rack];
520        if (rackLocality > bestRackLocalityForRegion) {
521          bestRackLocalityForRegion = rackLocality;
522          rackWithBestLocality = rack;
523        }
524      }
525      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
526    }
527
528  }
529
530  /**
531   * Maps region index to rack index
532   */
533  public int getRackForRegion(int region) {
534    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
535  }
536
537  enum LocalityType {
538    SERVER,
539    RACK
540  }
541
542  public void doAction(BalanceAction action) {
543    switch (action.getType()) {
544      case NULL:
545        break;
546      case ASSIGN_REGION:
547        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
548        assert action instanceof AssignRegionAction : action.getClass();
549        AssignRegionAction ar = (AssignRegionAction) action;
550        regionsPerServer[ar.getServer()] =
551          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
552        regionMoved(ar.getRegion(), -1, ar.getServer());
553        break;
554      case MOVE_REGION:
555        assert action instanceof MoveRegionAction : action.getClass();
556        MoveRegionAction mra = (MoveRegionAction) action;
557        regionsPerServer[mra.getFromServer()] =
558          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
559        regionsPerServer[mra.getToServer()] =
560          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
561        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
562        break;
563      case SWAP_REGIONS:
564        assert action instanceof SwapRegionsAction : action.getClass();
565        SwapRegionsAction a = (SwapRegionsAction) action;
566        regionsPerServer[a.getFromServer()] =
567          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
568        regionsPerServer[a.getToServer()] =
569          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
570        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
571        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
572        break;
573      default:
574        throw new RuntimeException("Uknown action:" + action.getType());
575    }
576  }
577
578  /**
579   * Return true if the placement of region on server would lower the availability of the region in
580   * question
581   * @return true or false
582   */
583  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
584    if (!serversToIndex.containsKey(serverName.getAddress())) {
585      return false; // safeguard against race between cluster.servers and servers from LB method
586      // args
587    }
588    int server = serversToIndex.get(serverName.getAddress());
589    int region = regionsToIndex.get(regionInfo);
590
591    // Region replicas for same region should better assign to different servers
592    for (int i : regionsPerServer[server]) {
593      RegionInfo otherRegionInfo = regions[i];
594      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
595        return true;
596      }
597    }
598
599    int primary = regionIndexToPrimaryIndex[region];
600    if (primary == -1) {
601      return false;
602    }
603    // there is a subset relation for server < host < rack
604    // check server first
605    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
606    if (result != 0) {
607      return result > 0;
608    }
609
610    // check host
611    if (multiServersPerHost) {
612      result = checkLocationForPrimary(serverIndexToHostIndex[server],
613        colocatedReplicaCountsPerHost, primary);
614      if (result != 0) {
615        return result > 0;
616      }
617    }
618
619    // check rack
620    if (numRacks > 1) {
621      result = checkLocationForPrimary(serverIndexToRackIndex[server],
622        colocatedReplicaCountsPerRack, primary);
623      if (result != 0) {
624        return result > 0;
625      }
626    }
627    return false;
628  }
629
630  /**
631   * Common method for better solution check.
632   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
633   *                                          colocatedReplicaCountsPerRack
634   * @return 1 for better, -1 for no better, 0 for unknown
635   */
636  private int checkLocationForPrimary(int location,
637    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
638    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
639      // check for whether there are other Locations that we can place this region
640      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
641        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
642          return 1; // meaning there is a better Location
643        }
644      }
645      return -1; // there is not a better Location to place this
646    }
647    return 0;
648  }
649
650  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
651    if (!serversToIndex.containsKey(serverName.getAddress())) {
652      return;
653    }
654    int server = serversToIndex.get(serverName.getAddress());
655    int region = regionsToIndex.get(regionInfo);
656    doAction(new AssignRegionAction(region, server));
657  }
658
659  void regionMoved(int region, int oldServer, int newServer) {
660    regionIndexToServerIndex[region] = newServer;
661    if (initialRegionIndexToServerIndex[region] == newServer) {
662      numMovedRegions--; // region moved back to original location
663    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
664      numMovedRegions++; // region moved from original location
665    }
666    int tableIndex = regionIndexToTableIndex[region];
667    if (oldServer >= 0) {
668      numRegionsPerServerPerTable[tableIndex][oldServer]--;
669    }
670    numRegionsPerServerPerTable[tableIndex][newServer]++;
671
672    // update for servers
673    int primary = regionIndexToPrimaryIndex[region];
674    if (oldServer >= 0) {
675      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
676    }
677    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
678
679    // update for hosts
680    if (multiServersPerHost) {
681      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
682        oldServer, newServer, primary, region);
683    }
684
685    // update for racks
686    if (numRacks > 1) {
687      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
688        oldServer, newServer, primary, region);
689    }
690  }
691
692  /**
693   * Common method for per host and per Location region index updates when a region is moved.
694   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
695   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
696   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
697   *                                          colocatedReplicaCountsPerRack
698   */
699  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
700    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
701    int primary, int region) {
702    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
703    int newLocation = serverIndexToLocation[newServer];
704    if (newLocation != oldLocation) {
705      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
706      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
707      if (oldLocation >= 0) {
708        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
709        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
710      }
711    }
712
713  }
714
715  int[] removeRegion(int[] regions, int regionIndex) {
716    // TODO: this maybe costly. Consider using linked lists
717    int[] newRegions = new int[regions.length - 1];
718    int i = 0;
719    for (i = 0; i < regions.length; i++) {
720      if (regions[i] == regionIndex) {
721        break;
722      }
723      newRegions[i] = regions[i];
724    }
725    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
726    return newRegions;
727  }
728
729  int[] addRegion(int[] regions, int regionIndex) {
730    int[] newRegions = new int[regions.length + 1];
731    System.arraycopy(regions, 0, newRegions, 0, regions.length);
732    newRegions[newRegions.length - 1] = regionIndex;
733    return newRegions;
734  }
735
736  int[] addRegionSorted(int[] regions, int regionIndex) {
737    int[] newRegions = new int[regions.length + 1];
738    int i = 0;
739    for (i = 0; i < regions.length; i++) { // find the index to insert
740      if (regions[i] > regionIndex) {
741        break;
742      }
743    }
744    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
745    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
746    newRegions[i] = regionIndex;
747
748    return newRegions;
749  }
750
751  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
752    int i = 0;
753    for (i = 0; i < regions.length; i++) {
754      if (regions[i] == regionIndex) {
755        regions[i] = newRegionIndex;
756        break;
757      }
758    }
759    return regions;
760  }
761
762  void sortServersByRegionCount() {
763    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
764  }
765
766  int getNumRegions(int server) {
767    return regionsPerServer[server].length;
768  }
769
770  public Comparator<Integer> getNumRegionsComparator() {
771    return numRegionsComparator;
772  }
773
774  boolean contains(int[] arr, int val) {
775    return Arrays.binarySearch(arr, val) >= 0;
776  }
777
778  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
779
780  int getLowestLocalityRegionOnServer(int serverIndex) {
781    if (regionFinder != null) {
782      float lowestLocality = 1.0f;
783      int lowestLocalityRegionIndex = -1;
784      if (regionsPerServer[serverIndex].length == 0) {
785        // No regions on that region server
786        return -1;
787      }
788      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
789        int regionIndex = regionsPerServer[serverIndex][j];
790        HDFSBlocksDistribution distribution =
791          regionFinder.getBlockDistribution(regions[regionIndex]);
792        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
793        // skip empty region
794        if (distribution.getUniqueBlocksTotalWeight() == 0) {
795          continue;
796        }
797        if (locality < lowestLocality) {
798          lowestLocality = locality;
799          lowestLocalityRegionIndex = j;
800        }
801      }
802      if (lowestLocalityRegionIndex == -1) {
803        return -1;
804      }
805      if (LOG.isTraceEnabled()) {
806        LOG.trace("Lowest locality region is "
807          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
808            .getRegionNameAsString()
809          + " with locality " + lowestLocality + " and its region server contains "
810          + regionsPerServer[serverIndex].length + " regions");
811      }
812      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
813    } else {
814      return -1;
815    }
816  }
817
818  float getLocalityOfRegion(int region, int server) {
819    if (regionFinder != null) {
820      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
821      return distribution.getBlockLocalityIndex(servers[server].getHostname());
822    } else {
823      return 0f;
824    }
825  }
826
827  void setNumRegions(int numRegions) {
828    this.numRegions = numRegions;
829  }
830
831  void setNumMovedRegions(int numMovedRegions) {
832    this.numMovedRegions = numMovedRegions;
833  }
834
835  @Override
836  public String toString() {
837    StringBuilder desc = new StringBuilder("Cluster={servers=[");
838    for (ServerName sn : servers) {
839      desc.append(sn.getAddress().toString()).append(", ");
840    }
841    desc.append("], serverIndicesSortedByRegionCount=")
842      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
843      .append(Arrays.deepToString(regionsPerServer));
844
845    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
846      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
847      .append('}');
848    return desc.toString();
849  }
850}