001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import org.agrona.collections.Hashing; 030import org.agrona.collections.Int2IntCounterMap; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionReplicaUtil; 035import org.apache.hadoop.hbase.master.RackManager; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * An efficient array based implementation similar to ClusterState for keeping the status of the 043 * cluster in terms of region assignment and distribution. LoadBalancers, such as 044 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 045 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 046 * <p/> 047 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 048 * topology in terms of server names, hostnames and racks. 049 */ 050@InterfaceAudience.Private 051class BalancerClusterState { 052 053 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 054 055 ServerName[] servers; 056 // ServerName uniquely identifies a region server. multiple RS can run on the same host 057 String[] hosts; 058 String[] racks; 059 boolean multiServersPerHost = false; // whether or not any host has more than one server 060 061 ArrayList<String> tables; 062 RegionInfo[] regions; 063 Deque<BalancerRegionLoad>[] regionLoads; 064 private RegionHDFSBlockLocationFinder regionFinder; 065 066 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 067 068 int[] serverIndexToHostIndex; // serverIndex -> host index 069 int[] serverIndexToRackIndex; // serverIndex -> rack index 070 071 int[][] regionsPerServer; // serverIndex -> region list 072 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 073 int[][] regionsPerHost; // hostIndex -> list of regions 074 int[][] regionsPerRack; // rackIndex -> region list 075 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 076 // replicas by primary region index 077 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 078 // primary region index 079 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 080 // primary region index 081 082 int[][] serversPerHost; // hostIndex -> list of server indexes 083 int[][] serversPerRack; // rackIndex -> list of server indexes 084 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 085 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 086 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 087 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 088 int[] numRegionsPerTable; // tableIndex -> region count 089 double[] meanRegionsPerTable; // mean region count per table 090 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 091 boolean hasRegionReplicas = false; // whether there is regions with replicas 092 093 Integer[] serverIndicesSortedByRegionCount; 094 Integer[] serverIndicesSortedByLocality; 095 096 Map<Address, Integer> serversToIndex; 097 Map<String, Integer> hostsToIndex; 098 Map<String, Integer> racksToIndex; 099 Map<String, Integer> tablesToIndex; 100 Map<RegionInfo, Integer> regionsToIndex; 101 float[] localityPerServer; 102 103 int numServers; 104 int numHosts; 105 int numRacks; 106 int numTables; 107 int numRegions; 108 109 int numMovedRegions = 0; // num moved regions from the initial configuration 110 Map<ServerName, List<RegionInfo>> clusterState; 111 112 private final RackManager rackManager; 113 // Maps region -> rackIndex -> locality of region on rack 114 private float[][] rackLocalities; 115 // Maps localityType -> region -> [server|rack]Index with highest locality 116 private int[][] regionsToMostLocalEntities; 117 118 static class DefaultRackManager extends RackManager { 119 @Override 120 public String getRack(ServerName server) { 121 return UNKNOWN_RACK; 122 } 123 } 124 125 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 126 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 127 RackManager rackManager) { 128 this(null, clusterState, loads, regionFinder, rackManager); 129 } 130 131 @SuppressWarnings("unchecked") 132 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 133 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 134 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { 135 if (unassignedRegions == null) { 136 unassignedRegions = Collections.emptyList(); 137 } 138 139 serversToIndex = new HashMap<>(); 140 hostsToIndex = new HashMap<>(); 141 racksToIndex = new HashMap<>(); 142 tablesToIndex = new HashMap<>(); 143 144 // TODO: We should get the list of tables from master 145 tables = new ArrayList<>(); 146 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 147 148 numRegions = 0; 149 150 List<List<Integer>> serversPerHostList = new ArrayList<>(); 151 List<List<Integer>> serversPerRackList = new ArrayList<>(); 152 this.clusterState = clusterState; 153 this.regionFinder = regionFinder; 154 155 // Use servername and port as there can be dead servers in this list. We want everything with 156 // a matching hostname and port to have the same index. 157 for (ServerName sn : clusterState.keySet()) { 158 if (sn == null) { 159 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 160 + "skipping; unassigned regions?"); 161 if (LOG.isTraceEnabled()) { 162 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 163 } 164 continue; 165 } 166 if (serversToIndex.get(sn.getAddress()) == null) { 167 serversToIndex.put(sn.getAddress(), numServers++); 168 } 169 if (!hostsToIndex.containsKey(sn.getHostname())) { 170 hostsToIndex.put(sn.getHostname(), numHosts++); 171 serversPerHostList.add(new ArrayList<>(1)); 172 } 173 174 int serverIndex = serversToIndex.get(sn.getAddress()); 175 int hostIndex = hostsToIndex.get(sn.getHostname()); 176 serversPerHostList.get(hostIndex).add(serverIndex); 177 178 String rack = this.rackManager.getRack(sn); 179 180 if (!racksToIndex.containsKey(rack)) { 181 racksToIndex.put(rack, numRacks++); 182 serversPerRackList.add(new ArrayList<>()); 183 } 184 int rackIndex = racksToIndex.get(rack); 185 serversPerRackList.get(rackIndex).add(serverIndex); 186 } 187 188 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 189 // Count how many regions there are. 190 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 191 numRegions += entry.getValue().size(); 192 } 193 numRegions += unassignedRegions.size(); 194 195 regionsToIndex = new HashMap<>(numRegions); 196 servers = new ServerName[numServers]; 197 serversPerHost = new int[numHosts][]; 198 serversPerRack = new int[numRacks][]; 199 regions = new RegionInfo[numRegions]; 200 regionIndexToServerIndex = new int[numRegions]; 201 initialRegionIndexToServerIndex = new int[numRegions]; 202 regionIndexToTableIndex = new int[numRegions]; 203 regionIndexToPrimaryIndex = new int[numRegions]; 204 regionLoads = new Deque[numRegions]; 205 206 regionLocations = new int[numRegions][]; 207 serverIndicesSortedByRegionCount = new Integer[numServers]; 208 serverIndicesSortedByLocality = new Integer[numServers]; 209 localityPerServer = new float[numServers]; 210 211 serverIndexToHostIndex = new int[numServers]; 212 serverIndexToRackIndex = new int[numServers]; 213 regionsPerServer = new int[numServers][]; 214 serverIndexToRegionsOffset = new int[numServers]; 215 regionsPerHost = new int[numHosts][]; 216 regionsPerRack = new int[numRacks][]; 217 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 218 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 219 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 220 221 int regionIndex = 0, regionPerServerIndex = 0; 222 223 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 224 if (entry.getKey() == null) { 225 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 226 continue; 227 } 228 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 229 230 // keep the servername if this is the first server name for this hostname 231 // or this servername has the newest startcode. 232 if ( 233 servers[serverIndex] == null 234 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 235 ) { 236 servers[serverIndex] = entry.getKey(); 237 } 238 239 if (regionsPerServer[serverIndex] != null) { 240 // there is another server with the same hostAndPort in ClusterState. 241 // allocate the array for the total size 242 regionsPerServer[serverIndex] = 243 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 244 } else { 245 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 246 } 247 colocatedReplicaCountsPerServer[serverIndex] = 248 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 249 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 250 serverIndicesSortedByLocality[serverIndex] = serverIndex; 251 } 252 253 hosts = new String[numHosts]; 254 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 255 hosts[entry.getValue()] = entry.getKey(); 256 } 257 racks = new String[numRacks]; 258 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 259 racks[entry.getValue()] = entry.getKey(); 260 } 261 262 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 263 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 264 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 265 266 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 267 serverIndexToHostIndex[serverIndex] = hostIndex; 268 269 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 270 serverIndexToRackIndex[serverIndex] = rackIndex; 271 272 for (RegionInfo region : entry.getValue()) { 273 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 274 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 275 regionIndex++; 276 } 277 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 278 } 279 280 for (RegionInfo region : unassignedRegions) { 281 registerRegion(region, regionIndex, -1, loads, regionFinder); 282 regionIndex++; 283 } 284 285 if (LOG.isDebugEnabled()) { 286 for (int i = 0; i < numServers; i++) { 287 LOG.debug("server {} has {} regions", i, regionsPerServer[i].length); 288 } 289 } 290 for (int i = 0; i < serversPerHostList.size(); i++) { 291 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 292 for (int j = 0; j < serversPerHost[i].length; j++) { 293 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 294 LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); 295 } 296 if (serversPerHost[i].length > 1) { 297 multiServersPerHost = true; 298 } 299 } 300 301 for (int i = 0; i < serversPerRackList.size(); i++) { 302 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 303 for (int j = 0; j < serversPerRack[i].length; j++) { 304 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 305 LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 306 } 307 } 308 309 numTables = tables.size(); 310 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 311 numRacks); 312 numRegionsPerServerPerTable = new int[numTables][numServers]; 313 numRegionsPerTable = new int[numTables]; 314 315 for (int i = 0; i < numTables; i++) { 316 for (int j = 0; j < numServers; j++) { 317 numRegionsPerServerPerTable[i][j] = 0; 318 } 319 } 320 321 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 322 if (regionIndexToServerIndex[i] >= 0) { 323 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 324 numRegionsPerTable[regionIndexToTableIndex[i]]++; 325 } 326 } 327 328 // Avoid repeated computation for planning 329 meanRegionsPerTable = new double[numTables]; 330 331 for (int i = 0; i < numTables; i++) { 332 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 333 } 334 335 for (int i = 0; i < regions.length; i++) { 336 RegionInfo info = regions[i]; 337 if (RegionReplicaUtil.isDefaultReplica(info)) { 338 regionIndexToPrimaryIndex[i] = i; 339 } else { 340 hasRegionReplicas = true; 341 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 342 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 343 } 344 } 345 346 for (int i = 0; i < regionsPerServer.length; i++) { 347 colocatedReplicaCountsPerServer[i] = 348 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 349 for (int j = 0; j < regionsPerServer[i].length; j++) { 350 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 351 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 352 } 353 } 354 // compute regionsPerHost 355 if (multiServersPerHost) { 356 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 357 serversPerHost); 358 } 359 360 // compute regionsPerRack 361 if (numRacks > 1) { 362 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 363 serversPerRack); 364 } 365 } 366 367 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 368 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 369 for (int i = 0; i < serversPerLocation.length; i++) { 370 int numRegionsPerLocation = 0; 371 for (int j = 0; j < serversPerLocation[i].length; j++) { 372 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 373 } 374 regionsPerLocation[i] = new int[numRegionsPerLocation]; 375 colocatedReplicaCountsPerLocation[i] = 376 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 377 } 378 379 for (int i = 0; i < serversPerLocation.length; i++) { 380 int numRegionPerLocationIndex = 0; 381 for (int j = 0; j < serversPerLocation[i].length; j++) { 382 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 383 int region = regionsPerServer[serversPerLocation[i][j]][k]; 384 regionsPerLocation[i][numRegionPerLocationIndex] = region; 385 int primaryIndex = regionIndexToPrimaryIndex[region]; 386 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 387 numRegionPerLocationIndex++; 388 } 389 } 390 } 391 392 } 393 394 /** Helper for Cluster constructor to handle a region */ 395 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 396 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 397 String tableName = region.getTable().getNameAsString(); 398 if (!tablesToIndex.containsKey(tableName)) { 399 tables.add(tableName); 400 tablesToIndex.put(tableName, tablesToIndex.size()); 401 } 402 int tableIndex = tablesToIndex.get(tableName); 403 404 regionsToIndex.put(region, regionIndex); 405 regions[regionIndex] = region; 406 regionIndexToServerIndex[regionIndex] = serverIndex; 407 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 408 regionIndexToTableIndex[regionIndex] = tableIndex; 409 410 // region load 411 if (loads != null) { 412 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 413 // That could have failed if the RegionLoad is using the other regionName 414 if (rl == null) { 415 // Try getting the region load using encoded name. 416 rl = loads.get(region.getEncodedName()); 417 } 418 regionLoads[regionIndex] = rl; 419 } 420 421 if (regionFinder != null) { 422 // region location 423 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 424 regionLocations[regionIndex] = new int[loc.size()]; 425 for (int i = 0; i < loc.size(); i++) { 426 regionLocations[regionIndex][i] = loc.get(i) == null 427 ? -1 428 : (serversToIndex.get(loc.get(i).getAddress()) == null 429 ? -1 430 : serversToIndex.get(loc.get(i).getAddress())); 431 } 432 } 433 } 434 435 /** 436 * Returns true iff a given server has less regions than the balanced amount 437 */ 438 public boolean serverHasTooFewRegions(int server) { 439 int minLoad = this.numRegions / numServers; 440 int numRegions = getNumRegions(server); 441 return numRegions < minLoad; 442 } 443 444 /** 445 * Retrieves and lazily initializes a field storing the locality of every region/server 446 * combination 447 */ 448 public float[][] getOrComputeRackLocalities() { 449 if (rackLocalities == null || regionsToMostLocalEntities == null) { 450 computeCachedLocalities(); 451 } 452 return rackLocalities; 453 } 454 455 /** 456 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 457 * the locality 458 */ 459 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 460 if (rackLocalities == null || regionsToMostLocalEntities == null) { 461 computeCachedLocalities(); 462 } 463 return regionsToMostLocalEntities[type.ordinal()]; 464 } 465 466 /** 467 * Looks up locality from cache of localities. Will create cache if it does not already exist. 468 */ 469 public float getOrComputeLocality(int region, int entity, 470 BalancerClusterState.LocalityType type) { 471 switch (type) { 472 case SERVER: 473 return getLocalityOfRegion(region, entity); 474 case RACK: 475 return getOrComputeRackLocalities()[region][entity]; 476 default: 477 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 478 } 479 } 480 481 /** 482 * Returns locality weighted by region size in MB. Will create locality cache if it does not 483 * already exist. 484 */ 485 public double getOrComputeWeightedLocality(int region, int server, 486 BalancerClusterState.LocalityType type) { 487 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 488 } 489 490 /** 491 * Returns the size in MB from the most recent RegionLoad for region 492 */ 493 public int getRegionSizeMB(int region) { 494 Deque<BalancerRegionLoad> load = regionLoads[region]; 495 // This means regions have no actual data on disk 496 if (load == null) { 497 return 0; 498 } 499 return regionLoads[region].getLast().getStorefileSizeMB(); 500 } 501 502 /** 503 * Computes and caches the locality for each region/rack combinations, as well as storing a 504 * mapping of region -> server and region -> rack such that server and rack have the highest 505 * locality for region 506 */ 507 private void computeCachedLocalities() { 508 rackLocalities = new float[numRegions][numRacks]; 509 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 510 511 // Compute localities and find most local server per region 512 for (int region = 0; region < numRegions; region++) { 513 int serverWithBestLocality = 0; 514 float bestLocalityForRegion = 0; 515 for (int server = 0; server < numServers; server++) { 516 // Aggregate per-rack locality 517 float locality = getLocalityOfRegion(region, server); 518 int rack = serverIndexToRackIndex[server]; 519 int numServersInRack = serversPerRack[rack].length; 520 rackLocalities[region][rack] += locality / numServersInRack; 521 522 if (locality > bestLocalityForRegion) { 523 serverWithBestLocality = server; 524 bestLocalityForRegion = locality; 525 } 526 } 527 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 528 529 // Find most local rack per region 530 int rackWithBestLocality = 0; 531 float bestRackLocalityForRegion = 0.0f; 532 for (int rack = 0; rack < numRacks; rack++) { 533 float rackLocality = rackLocalities[region][rack]; 534 if (rackLocality > bestRackLocalityForRegion) { 535 bestRackLocalityForRegion = rackLocality; 536 rackWithBestLocality = rack; 537 } 538 } 539 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 540 } 541 542 } 543 544 /** 545 * Maps region index to rack index 546 */ 547 public int getRackForRegion(int region) { 548 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 549 } 550 551 enum LocalityType { 552 SERVER, 553 RACK 554 } 555 556 public void doAction(BalanceAction action) { 557 switch (action.getType()) { 558 case NULL: 559 break; 560 case ASSIGN_REGION: 561 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 562 assert action instanceof AssignRegionAction : action.getClass(); 563 AssignRegionAction ar = (AssignRegionAction) action; 564 regionsPerServer[ar.getServer()] = 565 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 566 regionMoved(ar.getRegion(), -1, ar.getServer()); 567 break; 568 case MOVE_REGION: 569 assert action instanceof MoveRegionAction : action.getClass(); 570 MoveRegionAction mra = (MoveRegionAction) action; 571 regionsPerServer[mra.getFromServer()] = 572 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 573 regionsPerServer[mra.getToServer()] = 574 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 575 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 576 break; 577 case SWAP_REGIONS: 578 assert action instanceof SwapRegionsAction : action.getClass(); 579 SwapRegionsAction a = (SwapRegionsAction) action; 580 regionsPerServer[a.getFromServer()] = 581 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 582 regionsPerServer[a.getToServer()] = 583 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 584 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 585 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 586 break; 587 default: 588 throw new RuntimeException("Uknown action:" + action.getType()); 589 } 590 } 591 592 /** 593 * Return true if the placement of region on server would lower the availability of the region in 594 * question 595 * @return true or false 596 */ 597 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 598 if (!serversToIndex.containsKey(serverName.getAddress())) { 599 return false; // safeguard against race between cluster.servers and servers from LB method 600 // args 601 } 602 int server = serversToIndex.get(serverName.getAddress()); 603 int region = regionsToIndex.get(regionInfo); 604 605 // Region replicas for same region should better assign to different servers 606 for (int i : regionsPerServer[server]) { 607 RegionInfo otherRegionInfo = regions[i]; 608 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 609 return true; 610 } 611 } 612 613 int primary = regionIndexToPrimaryIndex[region]; 614 if (primary == -1) { 615 return false; 616 } 617 // there is a subset relation for server < host < rack 618 // check server first 619 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 620 if (result != 0) { 621 return result > 0; 622 } 623 624 // check host 625 if (multiServersPerHost) { 626 result = checkLocationForPrimary(serverIndexToHostIndex[server], 627 colocatedReplicaCountsPerHost, primary); 628 if (result != 0) { 629 return result > 0; 630 } 631 } 632 633 // check rack 634 if (numRacks > 1) { 635 result = checkLocationForPrimary(serverIndexToRackIndex[server], 636 colocatedReplicaCountsPerRack, primary); 637 if (result != 0) { 638 return result > 0; 639 } 640 } 641 return false; 642 } 643 644 /** 645 * Common method for better solution check. 646 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 647 * colocatedReplicaCountsPerRack 648 * @return 1 for better, -1 for no better, 0 for unknown 649 */ 650 private int checkLocationForPrimary(int location, 651 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 652 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 653 // check for whether there are other Locations that we can place this region 654 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 655 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 656 return 1; // meaning there is a better Location 657 } 658 } 659 return -1; // there is not a better Location to place this 660 } 661 return 0; 662 } 663 664 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 665 if (!serversToIndex.containsKey(serverName.getAddress())) { 666 return; 667 } 668 int server = serversToIndex.get(serverName.getAddress()); 669 int region = regionsToIndex.get(regionInfo); 670 doAction(new AssignRegionAction(region, server)); 671 } 672 673 void regionMoved(int region, int oldServer, int newServer) { 674 regionIndexToServerIndex[region] = newServer; 675 if (initialRegionIndexToServerIndex[region] == newServer) { 676 numMovedRegions--; // region moved back to original location 677 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 678 numMovedRegions++; // region moved from original location 679 } 680 int tableIndex = regionIndexToTableIndex[region]; 681 if (oldServer >= 0) { 682 numRegionsPerServerPerTable[tableIndex][oldServer]--; 683 } 684 numRegionsPerServerPerTable[tableIndex][newServer]++; 685 686 // update for servers 687 int primary = regionIndexToPrimaryIndex[region]; 688 if (oldServer >= 0) { 689 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 690 } 691 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 692 693 // update for hosts 694 if (multiServersPerHost) { 695 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 696 oldServer, newServer, primary, region); 697 } 698 699 // update for racks 700 if (numRacks > 1) { 701 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 702 oldServer, newServer, primary, region); 703 } 704 } 705 706 /** 707 * Common method for per host and per Location region index updates when a region is moved. 708 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 709 * @param regionsPerLocation regionsPerHost or regionsPerLocation 710 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 711 * colocatedReplicaCountsPerRack 712 */ 713 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 714 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 715 int primary, int region) { 716 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 717 int newLocation = serverIndexToLocation[newServer]; 718 if (newLocation != oldLocation) { 719 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 720 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 721 if (oldLocation >= 0) { 722 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 723 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 724 } 725 } 726 727 } 728 729 int[] removeRegion(int[] regions, int regionIndex) { 730 // TODO: this maybe costly. Consider using linked lists 731 int[] newRegions = new int[regions.length - 1]; 732 int i = 0; 733 for (i = 0; i < regions.length; i++) { 734 if (regions[i] == regionIndex) { 735 break; 736 } 737 newRegions[i] = regions[i]; 738 } 739 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 740 return newRegions; 741 } 742 743 int[] addRegion(int[] regions, int regionIndex) { 744 int[] newRegions = new int[regions.length + 1]; 745 System.arraycopy(regions, 0, newRegions, 0, regions.length); 746 newRegions[newRegions.length - 1] = regionIndex; 747 return newRegions; 748 } 749 750 int[] addRegionSorted(int[] regions, int regionIndex) { 751 int[] newRegions = new int[regions.length + 1]; 752 int i = 0; 753 for (i = 0; i < regions.length; i++) { // find the index to insert 754 if (regions[i] > regionIndex) { 755 break; 756 } 757 } 758 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 759 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 760 newRegions[i] = regionIndex; 761 762 return newRegions; 763 } 764 765 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 766 int i = 0; 767 for (i = 0; i < regions.length; i++) { 768 if (regions[i] == regionIndex) { 769 regions[i] = newRegionIndex; 770 break; 771 } 772 } 773 return regions; 774 } 775 776 void sortServersByRegionCount() { 777 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 778 } 779 780 int getNumRegions(int server) { 781 return regionsPerServer[server].length; 782 } 783 784 boolean contains(int[] arr, int val) { 785 return Arrays.binarySearch(arr, val) >= 0; 786 } 787 788 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 789 790 public Comparator<Integer> getNumRegionsComparator() { 791 return numRegionsComparator; 792 } 793 794 int getLowestLocalityRegionOnServer(int serverIndex) { 795 if (regionFinder != null) { 796 float lowestLocality = 1.0f; 797 int lowestLocalityRegionIndex = -1; 798 if (regionsPerServer[serverIndex].length == 0) { 799 // No regions on that region server 800 return -1; 801 } 802 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 803 int regionIndex = regionsPerServer[serverIndex][j]; 804 HDFSBlocksDistribution distribution = 805 regionFinder.getBlockDistribution(regions[regionIndex]); 806 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 807 // skip empty region 808 if (distribution.getUniqueBlocksTotalWeight() == 0) { 809 continue; 810 } 811 if (locality < lowestLocality) { 812 lowestLocality = locality; 813 lowestLocalityRegionIndex = j; 814 } 815 } 816 if (lowestLocalityRegionIndex == -1) { 817 return -1; 818 } 819 if (LOG.isTraceEnabled()) { 820 LOG.trace("Lowest locality region is " 821 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 822 .getRegionNameAsString() 823 + " with locality " + lowestLocality + " and its region server contains " 824 + regionsPerServer[serverIndex].length + " regions"); 825 } 826 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 827 } else { 828 return -1; 829 } 830 } 831 832 float getLocalityOfRegion(int region, int server) { 833 if (regionFinder != null) { 834 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 835 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 836 } else { 837 return 0f; 838 } 839 } 840 841 void setNumRegions(int numRegions) { 842 this.numRegions = numRegions; 843 } 844 845 void setNumMovedRegions(int numMovedRegions) { 846 this.numMovedRegions = numMovedRegions; 847 } 848 849 @Override 850 public String toString() { 851 StringBuilder desc = new StringBuilder("Cluster={servers=["); 852 for (ServerName sn : servers) { 853 desc.append(sn.getAddress().toString()).append(", "); 854 } 855 desc.append("], serverIndicesSortedByRegionCount=") 856 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 857 .append(Arrays.deepToString(regionsPerServer)); 858 859 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 860 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 861 .append('}'); 862 return desc.toString(); 863 } 864}