001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import org.agrona.collections.Hashing; 030import org.agrona.collections.Int2IntCounterMap; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionReplicaUtil; 035import org.apache.hadoop.hbase.master.RackManager; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * An efficient array based implementation similar to ClusterState for keeping the status of the 043 * cluster in terms of region assignment and distribution. LoadBalancers, such as 044 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 045 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 046 * <p/> 047 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 048 * topology in terms of server names, hostnames and racks. 049 */ 050@InterfaceAudience.Private 051class BalancerClusterState { 052 053 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 054 055 ServerName[] servers; 056 // ServerName uniquely identifies a region server. multiple RS can run on the same host 057 String[] hosts; 058 String[] racks; 059 boolean multiServersPerHost = false; // whether or not any host has more than one server 060 061 ArrayList<String> tables; 062 RegionInfo[] regions; 063 Deque<BalancerRegionLoad>[] regionLoads; 064 private RegionLocationFinder regionFinder; 065 066 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 067 068 int[] serverIndexToHostIndex; // serverIndex -> host index 069 int[] serverIndexToRackIndex; // serverIndex -> rack index 070 071 int[][] regionsPerServer; // serverIndex -> region list 072 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 073 int[][] regionsPerHost; // hostIndex -> list of regions 074 int[][] regionsPerRack; // rackIndex -> region list 075 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 076 // replicas by primary region index 077 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 078 // primary region index 079 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 080 // primary region index 081 082 int[][] serversPerHost; // hostIndex -> list of server indexes 083 int[][] serversPerRack; // rackIndex -> list of server indexes 084 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 085 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 086 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 087 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions 088 int[] numRegionsPerTable; // tableIndex -> region count 089 int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS 090 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 091 boolean hasRegionReplicas = false; // whether there is regions with replicas 092 093 Integer[] serverIndicesSortedByRegionCount; 094 Integer[] serverIndicesSortedByLocality; 095 096 Map<Address, Integer> serversToIndex; 097 Map<String, Integer> hostsToIndex; 098 Map<String, Integer> racksToIndex; 099 Map<String, Integer> tablesToIndex; 100 Map<RegionInfo, Integer> regionsToIndex; 101 float[] localityPerServer; 102 103 int numServers; 104 int numHosts; 105 int numRacks; 106 int numTables; 107 int numRegions; 108 109 int numMovedRegions = 0; // num moved regions from the initial configuration 110 Map<ServerName, List<RegionInfo>> clusterState; 111 112 private final RackManager rackManager; 113 // Maps region -> rackIndex -> locality of region on rack 114 private float[][] rackLocalities; 115 // Maps localityType -> region -> [server|rack]Index with highest locality 116 private int[][] regionsToMostLocalEntities; 117 118 static class DefaultRackManager extends RackManager { 119 @Override 120 public String getRack(ServerName server) { 121 return UNKNOWN_RACK; 122 } 123 } 124 125 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 126 Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder, 127 RackManager rackManager) { 128 this(null, clusterState, loads, regionFinder, rackManager); 129 } 130 131 @SuppressWarnings("unchecked") 132 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 133 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 134 RegionLocationFinder regionFinder, RackManager rackManager) { 135 if (unassignedRegions == null) { 136 unassignedRegions = Collections.emptyList(); 137 } 138 139 serversToIndex = new HashMap<>(); 140 hostsToIndex = new HashMap<>(); 141 racksToIndex = new HashMap<>(); 142 tablesToIndex = new HashMap<>(); 143 144 // TODO: We should get the list of tables from master 145 tables = new ArrayList<>(); 146 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 147 148 numRegions = 0; 149 150 List<List<Integer>> serversPerHostList = new ArrayList<>(); 151 List<List<Integer>> serversPerRackList = new ArrayList<>(); 152 this.clusterState = clusterState; 153 this.regionFinder = regionFinder; 154 155 // Use servername and port as there can be dead servers in this list. We want everything with 156 // a matching hostname and port to have the same index. 157 for (ServerName sn : clusterState.keySet()) { 158 if (sn == null) { 159 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 160 + "skipping; unassigned regions?"); 161 if (LOG.isTraceEnabled()) { 162 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 163 } 164 continue; 165 } 166 if (serversToIndex.get(sn.getAddress()) == null) { 167 serversToIndex.put(sn.getAddress(), numServers++); 168 } 169 if (!hostsToIndex.containsKey(sn.getHostname())) { 170 hostsToIndex.put(sn.getHostname(), numHosts++); 171 serversPerHostList.add(new ArrayList<>(1)); 172 } 173 174 int serverIndex = serversToIndex.get(sn.getAddress()); 175 int hostIndex = hostsToIndex.get(sn.getHostname()); 176 serversPerHostList.get(hostIndex).add(serverIndex); 177 178 String rack = this.rackManager.getRack(sn); 179 if (!racksToIndex.containsKey(rack)) { 180 racksToIndex.put(rack, numRacks++); 181 serversPerRackList.add(new ArrayList<>()); 182 } 183 int rackIndex = racksToIndex.get(rack); 184 serversPerRackList.get(rackIndex).add(serverIndex); 185 } 186 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 187 // Count how many regions there are. 188 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 189 numRegions += entry.getValue().size(); 190 } 191 numRegions += unassignedRegions.size(); 192 193 regionsToIndex = new HashMap<>(numRegions); 194 servers = new ServerName[numServers]; 195 serversPerHost = new int[numHosts][]; 196 serversPerRack = new int[numRacks][]; 197 regions = new RegionInfo[numRegions]; 198 regionIndexToServerIndex = new int[numRegions]; 199 initialRegionIndexToServerIndex = new int[numRegions]; 200 regionIndexToTableIndex = new int[numRegions]; 201 regionIndexToPrimaryIndex = new int[numRegions]; 202 regionLoads = new Deque[numRegions]; 203 204 regionLocations = new int[numRegions][]; 205 serverIndicesSortedByRegionCount = new Integer[numServers]; 206 serverIndicesSortedByLocality = new Integer[numServers]; 207 localityPerServer = new float[numServers]; 208 209 serverIndexToHostIndex = new int[numServers]; 210 serverIndexToRackIndex = new int[numServers]; 211 regionsPerServer = new int[numServers][]; 212 serverIndexToRegionsOffset = new int[numServers]; 213 regionsPerHost = new int[numHosts][]; 214 regionsPerRack = new int[numRacks][]; 215 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 216 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 217 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 218 219 int regionIndex = 0, regionPerServerIndex = 0; 220 221 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 222 if (entry.getKey() == null) { 223 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 224 continue; 225 } 226 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 227 228 // keep the servername if this is the first server name for this hostname 229 // or this servername has the newest startcode. 230 if ( 231 servers[serverIndex] == null 232 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 233 ) { 234 servers[serverIndex] = entry.getKey(); 235 } 236 237 if (regionsPerServer[serverIndex] != null) { 238 // there is another server with the same hostAndPort in ClusterState. 239 // allocate the array for the total size 240 regionsPerServer[serverIndex] = 241 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 242 } else { 243 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 244 } 245 colocatedReplicaCountsPerServer[serverIndex] = 246 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 247 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 248 serverIndicesSortedByLocality[serverIndex] = serverIndex; 249 } 250 251 hosts = new String[numHosts]; 252 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 253 hosts[entry.getValue()] = entry.getKey(); 254 } 255 racks = new String[numRacks]; 256 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 257 racks[entry.getValue()] = entry.getKey(); 258 } 259 260 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 261 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 262 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 263 264 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 265 serverIndexToHostIndex[serverIndex] = hostIndex; 266 267 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 268 serverIndexToRackIndex[serverIndex] = rackIndex; 269 270 for (RegionInfo region : entry.getValue()) { 271 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 272 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 273 regionIndex++; 274 } 275 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 276 } 277 278 for (RegionInfo region : unassignedRegions) { 279 registerRegion(region, regionIndex, -1, loads, regionFinder); 280 regionIndex++; 281 } 282 283 for (int i = 0; i < serversPerHostList.size(); i++) { 284 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 285 for (int j = 0; j < serversPerHost[i].length; j++) { 286 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 287 LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); 288 } 289 if (serversPerHost[i].length > 1) { 290 multiServersPerHost = true; 291 } 292 } 293 294 for (int i = 0; i < serversPerRackList.size(); i++) { 295 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 296 for (int j = 0; j < serversPerRack[i].length; j++) { 297 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 298 LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 299 } 300 } 301 302 numTables = tables.size(); 303 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 304 numRacks); 305 numRegionsPerServerPerTable = new int[numTables][numServers]; 306 numRegionsPerTable = new int[numTables]; 307 308 for (int i = 0; i < numTables; i++) { 309 for (int j = 0; j < numServers; j++) { 310 numRegionsPerServerPerTable[i][j] = 0; 311 } 312 } 313 314 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 315 if (regionIndexToServerIndex[i] >= 0) { 316 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 317 numRegionsPerTable[regionIndexToTableIndex[i]]++; 318 } 319 } 320 321 for (int i = 0; i < regions.length; i++) { 322 RegionInfo info = regions[i]; 323 if (RegionReplicaUtil.isDefaultReplica(info)) { 324 regionIndexToPrimaryIndex[i] = i; 325 } else { 326 hasRegionReplicas = true; 327 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 328 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 329 } 330 } 331 332 for (int i = 0; i < regionsPerServer.length; i++) { 333 colocatedReplicaCountsPerServer[i] = 334 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 335 for (int j = 0; j < regionsPerServer[i].length; j++) { 336 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 337 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 338 } 339 } 340 // compute regionsPerHost 341 if (multiServersPerHost) { 342 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 343 serversPerHost); 344 } 345 346 // compute regionsPerRack 347 if (numRacks > 1) { 348 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 349 serversPerRack); 350 } 351 } 352 353 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 354 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 355 for (int i = 0; i < serversPerLocation.length; i++) { 356 int numRegionsPerLocation = 0; 357 for (int j = 0; j < serversPerLocation[i].length; j++) { 358 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 359 } 360 regionsPerLocation[i] = new int[numRegionsPerLocation]; 361 colocatedReplicaCountsPerLocation[i] = 362 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 363 } 364 365 for (int i = 0; i < serversPerLocation.length; i++) { 366 int numRegionPerLocationIndex = 0; 367 for (int j = 0; j < serversPerLocation[i].length; j++) { 368 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 369 int region = regionsPerServer[serversPerLocation[i][j]][k]; 370 regionsPerLocation[i][numRegionPerLocationIndex] = region; 371 int primaryIndex = regionIndexToPrimaryIndex[region]; 372 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 373 numRegionPerLocationIndex++; 374 } 375 } 376 } 377 378 } 379 380 /** Helper for Cluster constructor to handle a region */ 381 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 382 Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder) { 383 String tableName = region.getTable().getNameAsString(); 384 if (!tablesToIndex.containsKey(tableName)) { 385 tables.add(tableName); 386 tablesToIndex.put(tableName, tablesToIndex.size()); 387 } 388 int tableIndex = tablesToIndex.get(tableName); 389 390 regionsToIndex.put(region, regionIndex); 391 regions[regionIndex] = region; 392 regionIndexToServerIndex[regionIndex] = serverIndex; 393 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 394 regionIndexToTableIndex[regionIndex] = tableIndex; 395 396 // region load 397 if (loads != null) { 398 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 399 // That could have failed if the RegionLoad is using the other regionName 400 if (rl == null) { 401 // Try getting the region load using encoded name. 402 rl = loads.get(region.getEncodedName()); 403 } 404 regionLoads[regionIndex] = rl; 405 } 406 407 if (regionFinder != null) { 408 // region location 409 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 410 regionLocations[regionIndex] = new int[loc.size()]; 411 for (int i = 0; i < loc.size(); i++) { 412 regionLocations[regionIndex][i] = loc.get(i) == null 413 ? -1 414 : (serversToIndex.get(loc.get(i).getAddress()) == null 415 ? -1 416 : serversToIndex.get(loc.get(i).getAddress())); 417 } 418 } 419 } 420 421 /** 422 * Returns true iff a given server has less regions than the balanced amount 423 */ 424 public boolean serverHasTooFewRegions(int server) { 425 int minLoad = this.numRegions / numServers; 426 int numRegions = getNumRegions(server); 427 return numRegions < minLoad; 428 } 429 430 /** 431 * Retrieves and lazily initializes a field storing the locality of every region/server 432 * combination 433 */ 434 public float[][] getOrComputeRackLocalities() { 435 if (rackLocalities == null || regionsToMostLocalEntities == null) { 436 computeCachedLocalities(); 437 } 438 return rackLocalities; 439 } 440 441 /** 442 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 443 * the locality 444 */ 445 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 446 if (rackLocalities == null || regionsToMostLocalEntities == null) { 447 computeCachedLocalities(); 448 } 449 return regionsToMostLocalEntities[type.ordinal()]; 450 } 451 452 /** 453 * Looks up locality from cache of localities. Will create cache if it does not already exist. 454 */ 455 public float getOrComputeLocality(int region, int entity, 456 BalancerClusterState.LocalityType type) { 457 switch (type) { 458 case SERVER: 459 return getLocalityOfRegion(region, entity); 460 case RACK: 461 return getOrComputeRackLocalities()[region][entity]; 462 default: 463 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 464 } 465 } 466 467 /** 468 * Returns locality weighted by region size in MB. Will create locality cache if it does not 469 * already exist. 470 */ 471 public double getOrComputeWeightedLocality(int region, int server, 472 BalancerClusterState.LocalityType type) { 473 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 474 } 475 476 /** 477 * Returns the size in MB from the most recent RegionLoad for region 478 */ 479 public int getRegionSizeMB(int region) { 480 Deque<BalancerRegionLoad> load = regionLoads[region]; 481 // This means regions have no actual data on disk 482 if (load == null) { 483 return 0; 484 } 485 return regionLoads[region].getLast().getStorefileSizeMB(); 486 } 487 488 /** 489 * Computes and caches the locality for each region/rack combinations, as well as storing a 490 * mapping of region -> server and region -> rack such that server and rack have the highest 491 * locality for region 492 */ 493 private void computeCachedLocalities() { 494 rackLocalities = new float[numRegions][numRacks]; 495 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 496 497 // Compute localities and find most local server per region 498 for (int region = 0; region < numRegions; region++) { 499 int serverWithBestLocality = 0; 500 float bestLocalityForRegion = 0; 501 for (int server = 0; server < numServers; server++) { 502 // Aggregate per-rack locality 503 float locality = getLocalityOfRegion(region, server); 504 int rack = serverIndexToRackIndex[server]; 505 int numServersInRack = serversPerRack[rack].length; 506 rackLocalities[region][rack] += locality / numServersInRack; 507 508 if (locality > bestLocalityForRegion) { 509 serverWithBestLocality = server; 510 bestLocalityForRegion = locality; 511 } 512 } 513 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 514 515 // Find most local rack per region 516 int rackWithBestLocality = 0; 517 float bestRackLocalityForRegion = 0.0f; 518 for (int rack = 0; rack < numRacks; rack++) { 519 float rackLocality = rackLocalities[region][rack]; 520 if (rackLocality > bestRackLocalityForRegion) { 521 bestRackLocalityForRegion = rackLocality; 522 rackWithBestLocality = rack; 523 } 524 } 525 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 526 } 527 528 } 529 530 /** 531 * Maps region index to rack index 532 */ 533 public int getRackForRegion(int region) { 534 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 535 } 536 537 enum LocalityType { 538 SERVER, 539 RACK 540 } 541 542 public void doAction(BalanceAction action) { 543 switch (action.getType()) { 544 case NULL: 545 break; 546 case ASSIGN_REGION: 547 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 548 assert action instanceof AssignRegionAction : action.getClass(); 549 AssignRegionAction ar = (AssignRegionAction) action; 550 regionsPerServer[ar.getServer()] = 551 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 552 regionMoved(ar.getRegion(), -1, ar.getServer()); 553 break; 554 case MOVE_REGION: 555 assert action instanceof MoveRegionAction : action.getClass(); 556 MoveRegionAction mra = (MoveRegionAction) action; 557 regionsPerServer[mra.getFromServer()] = 558 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 559 regionsPerServer[mra.getToServer()] = 560 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 561 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 562 break; 563 case SWAP_REGIONS: 564 assert action instanceof SwapRegionsAction : action.getClass(); 565 SwapRegionsAction a = (SwapRegionsAction) action; 566 regionsPerServer[a.getFromServer()] = 567 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 568 regionsPerServer[a.getToServer()] = 569 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 570 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 571 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 572 break; 573 default: 574 throw new RuntimeException("Uknown action:" + action.getType()); 575 } 576 } 577 578 /** 579 * Return true if the placement of region on server would lower the availability of the region in 580 * question 581 * @return true or false 582 */ 583 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 584 if (!serversToIndex.containsKey(serverName.getAddress())) { 585 return false; // safeguard against race between cluster.servers and servers from LB method 586 // args 587 } 588 int server = serversToIndex.get(serverName.getAddress()); 589 int region = regionsToIndex.get(regionInfo); 590 591 // Region replicas for same region should better assign to different servers 592 for (int i : regionsPerServer[server]) { 593 RegionInfo otherRegionInfo = regions[i]; 594 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 595 return true; 596 } 597 } 598 599 int primary = regionIndexToPrimaryIndex[region]; 600 if (primary == -1) { 601 return false; 602 } 603 // there is a subset relation for server < host < rack 604 // check server first 605 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 606 if (result != 0) { 607 return result > 0; 608 } 609 610 // check host 611 if (multiServersPerHost) { 612 result = checkLocationForPrimary(serverIndexToHostIndex[server], 613 colocatedReplicaCountsPerHost, primary); 614 if (result != 0) { 615 return result > 0; 616 } 617 } 618 619 // check rack 620 if (numRacks > 1) { 621 result = checkLocationForPrimary(serverIndexToRackIndex[server], 622 colocatedReplicaCountsPerRack, primary); 623 if (result != 0) { 624 return result > 0; 625 } 626 } 627 return false; 628 } 629 630 /** 631 * Common method for better solution check. 632 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 633 * colocatedReplicaCountsPerRack 634 * @return 1 for better, -1 for no better, 0 for unknown 635 */ 636 private int checkLocationForPrimary(int location, 637 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 638 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 639 // check for whether there are other Locations that we can place this region 640 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 641 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 642 return 1; // meaning there is a better Location 643 } 644 } 645 return -1; // there is not a better Location to place this 646 } 647 return 0; 648 } 649 650 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 651 if (!serversToIndex.containsKey(serverName.getAddress())) { 652 return; 653 } 654 int server = serversToIndex.get(serverName.getAddress()); 655 int region = regionsToIndex.get(regionInfo); 656 doAction(new AssignRegionAction(region, server)); 657 } 658 659 void regionMoved(int region, int oldServer, int newServer) { 660 regionIndexToServerIndex[region] = newServer; 661 if (initialRegionIndexToServerIndex[region] == newServer) { 662 numMovedRegions--; // region moved back to original location 663 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 664 numMovedRegions++; // region moved from original location 665 } 666 int tableIndex = regionIndexToTableIndex[region]; 667 if (oldServer >= 0) { 668 numRegionsPerServerPerTable[tableIndex][oldServer]--; 669 } 670 numRegionsPerServerPerTable[tableIndex][newServer]++; 671 672 // update for servers 673 int primary = regionIndexToPrimaryIndex[region]; 674 if (oldServer >= 0) { 675 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 676 } 677 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 678 679 // update for hosts 680 if (multiServersPerHost) { 681 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 682 oldServer, newServer, primary, region); 683 } 684 685 // update for racks 686 if (numRacks > 1) { 687 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 688 oldServer, newServer, primary, region); 689 } 690 } 691 692 /** 693 * Common method for per host and per Location region index updates when a region is moved. 694 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 695 * @param regionsPerLocation regionsPerHost or regionsPerLocation 696 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 697 * colocatedReplicaCountsPerRack 698 */ 699 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 700 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 701 int primary, int region) { 702 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 703 int newLocation = serverIndexToLocation[newServer]; 704 if (newLocation != oldLocation) { 705 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 706 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 707 if (oldLocation >= 0) { 708 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 709 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 710 } 711 } 712 713 } 714 715 int[] removeRegion(int[] regions, int regionIndex) { 716 // TODO: this maybe costly. Consider using linked lists 717 int[] newRegions = new int[regions.length - 1]; 718 int i = 0; 719 for (i = 0; i < regions.length; i++) { 720 if (regions[i] == regionIndex) { 721 break; 722 } 723 newRegions[i] = regions[i]; 724 } 725 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 726 return newRegions; 727 } 728 729 int[] addRegion(int[] regions, int regionIndex) { 730 int[] newRegions = new int[regions.length + 1]; 731 System.arraycopy(regions, 0, newRegions, 0, regions.length); 732 newRegions[newRegions.length - 1] = regionIndex; 733 return newRegions; 734 } 735 736 int[] addRegionSorted(int[] regions, int regionIndex) { 737 int[] newRegions = new int[regions.length + 1]; 738 int i = 0; 739 for (i = 0; i < regions.length; i++) { // find the index to insert 740 if (regions[i] > regionIndex) { 741 break; 742 } 743 } 744 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 745 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 746 newRegions[i] = regionIndex; 747 748 return newRegions; 749 } 750 751 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 752 int i = 0; 753 for (i = 0; i < regions.length; i++) { 754 if (regions[i] == regionIndex) { 755 regions[i] = newRegionIndex; 756 break; 757 } 758 } 759 return regions; 760 } 761 762 void sortServersByRegionCount() { 763 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 764 } 765 766 int getNumRegions(int server) { 767 return regionsPerServer[server].length; 768 } 769 770 public Comparator<Integer> getNumRegionsComparator() { 771 return numRegionsComparator; 772 } 773 774 boolean contains(int[] arr, int val) { 775 return Arrays.binarySearch(arr, val) >= 0; 776 } 777 778 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 779 780 int getLowestLocalityRegionOnServer(int serverIndex) { 781 if (regionFinder != null) { 782 float lowestLocality = 1.0f; 783 int lowestLocalityRegionIndex = -1; 784 if (regionsPerServer[serverIndex].length == 0) { 785 // No regions on that region server 786 return -1; 787 } 788 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 789 int regionIndex = regionsPerServer[serverIndex][j]; 790 HDFSBlocksDistribution distribution = 791 regionFinder.getBlockDistribution(regions[regionIndex]); 792 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 793 // skip empty region 794 if (distribution.getUniqueBlocksTotalWeight() == 0) { 795 continue; 796 } 797 if (locality < lowestLocality) { 798 lowestLocality = locality; 799 lowestLocalityRegionIndex = j; 800 } 801 } 802 if (lowestLocalityRegionIndex == -1) { 803 return -1; 804 } 805 if (LOG.isTraceEnabled()) { 806 LOG.trace("Lowest locality region is " 807 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 808 .getRegionNameAsString() 809 + " with locality " + lowestLocality + " and its region server contains " 810 + regionsPerServer[serverIndex].length + " regions"); 811 } 812 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 813 } else { 814 return -1; 815 } 816 } 817 818 float getLocalityOfRegion(int region, int server) { 819 if (regionFinder != null) { 820 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 821 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 822 } else { 823 return 0f; 824 } 825 } 826 827 void setNumRegions(int numRegions) { 828 this.numRegions = numRegions; 829 } 830 831 void setNumMovedRegions(int numMovedRegions) { 832 this.numMovedRegions = numMovedRegions; 833 } 834 835 @Override 836 public String toString() { 837 StringBuilder desc = new StringBuilder("Cluster={servers=["); 838 for (ServerName sn : servers) { 839 desc.append(sn.getAddress().toString()).append(", "); 840 } 841 desc.append("], serverIndicesSortedByRegionCount=") 842 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 843 .append(Arrays.deepToString(regionsPerServer)); 844 845 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 846 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 847 .append('}'); 848 return desc.toString(); 849 } 850}