001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import org.agrona.collections.Hashing; 030import org.agrona.collections.Int2IntCounterMap; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionReplicaUtil; 035import org.apache.hadoop.hbase.master.RackManager; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * An efficient array based implementation similar to ClusterState for keeping the status of the 044 * cluster in terms of region assignment and distribution. LoadBalancers, such as 045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 046 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 047 * <p/> 048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 049 * topology in terms of server names, hostnames and racks. 050 */ 051@InterfaceAudience.Private 052class BalancerClusterState { 053 054 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 055 056 ServerName[] servers; 057 // ServerName uniquely identifies a region server. multiple RS can run on the same host 058 String[] hosts; 059 String[] racks; 060 boolean multiServersPerHost = false; // whether or not any host has more than one server 061 062 ArrayList<String> tables; 063 RegionInfo[] regions; 064 Deque<BalancerRegionLoad>[] regionLoads; 065 private RegionHDFSBlockLocationFinder regionFinder; 066 067 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 068 069 int[] serverIndexToHostIndex; // serverIndex -> host index 070 int[] serverIndexToRackIndex; // serverIndex -> rack index 071 072 int[][] regionsPerServer; // serverIndex -> region list 073 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 074 int[][] regionsPerHost; // hostIndex -> list of regions 075 int[][] regionsPerRack; // rackIndex -> region list 076 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 077 // replicas by primary region index 078 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 079 // primary region index 080 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 081 // primary region index 082 083 int[][] serversPerHost; // hostIndex -> list of server indexes 084 int[][] serversPerRack; // rackIndex -> list of server indexes 085 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 086 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 087 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 088 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 089 int[] numRegionsPerTable; // tableIndex -> region count 090 double[] meanRegionsPerTable; // mean region count per table 091 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 092 boolean hasRegionReplicas = false; // whether there is regions with replicas 093 094 Integer[] serverIndicesSortedByRegionCount; 095 Integer[] serverIndicesSortedByLocality; 096 097 Map<Address, Integer> serversToIndex; 098 Map<String, Integer> hostsToIndex; 099 Map<String, Integer> racksToIndex; 100 Map<String, Integer> tablesToIndex; 101 Map<RegionInfo, Integer> regionsToIndex; 102 float[] localityPerServer; 103 104 int numServers; 105 int numHosts; 106 int numRacks; 107 int numTables; 108 int numRegions; 109 110 int numMovedRegions = 0; // num moved regions from the initial configuration 111 Map<ServerName, List<RegionInfo>> clusterState; 112 113 private final RackManager rackManager; 114 // Maps region -> rackIndex -> locality of region on rack 115 private float[][] rackLocalities; 116 // Maps localityType -> region -> [server|rack]Index with highest locality 117 private int[][] regionsToMostLocalEntities; 118 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 119 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 120 // Maps regionIndex -> serverIndex with best region cache ratio 121 private int[] regionServerIndexWithBestRegionCachedRatio; 122 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 123 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 124 125 static class DefaultRackManager extends RackManager { 126 @Override 127 public String getRack(ServerName server) { 128 return UNKNOWN_RACK; 129 } 130 } 131 132 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 133 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 134 RackManager rackManager) { 135 this(null, clusterState, loads, regionFinder, rackManager, null); 136 } 137 138 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 139 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 140 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 141 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); 142 } 143 144 @SuppressWarnings("unchecked") 145 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 146 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 147 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, 148 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 149 if (unassignedRegions == null) { 150 unassignedRegions = Collections.emptyList(); 151 } 152 153 serversToIndex = new HashMap<>(); 154 hostsToIndex = new HashMap<>(); 155 racksToIndex = new HashMap<>(); 156 tablesToIndex = new HashMap<>(); 157 158 // TODO: We should get the list of tables from master 159 tables = new ArrayList<>(); 160 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 161 162 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 163 164 numRegions = 0; 165 166 List<List<Integer>> serversPerHostList = new ArrayList<>(); 167 List<List<Integer>> serversPerRackList = new ArrayList<>(); 168 this.clusterState = clusterState; 169 this.regionFinder = regionFinder; 170 171 // Use servername and port as there can be dead servers in this list. We want everything with 172 // a matching hostname and port to have the same index. 173 for (ServerName sn : clusterState.keySet()) { 174 if (sn == null) { 175 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 176 + "skipping; unassigned regions?"); 177 if (LOG.isTraceEnabled()) { 178 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 179 } 180 continue; 181 } 182 if (serversToIndex.get(sn.getAddress()) == null) { 183 serversToIndex.put(sn.getAddress(), numServers++); 184 } 185 if (!hostsToIndex.containsKey(sn.getHostname())) { 186 hostsToIndex.put(sn.getHostname(), numHosts++); 187 serversPerHostList.add(new ArrayList<>(1)); 188 } 189 190 int serverIndex = serversToIndex.get(sn.getAddress()); 191 int hostIndex = hostsToIndex.get(sn.getHostname()); 192 serversPerHostList.get(hostIndex).add(serverIndex); 193 194 String rack = this.rackManager.getRack(sn); 195 196 if (!racksToIndex.containsKey(rack)) { 197 racksToIndex.put(rack, numRacks++); 198 serversPerRackList.add(new ArrayList<>()); 199 } 200 int rackIndex = racksToIndex.get(rack); 201 serversPerRackList.get(rackIndex).add(serverIndex); 202 } 203 204 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 205 // Count how many regions there are. 206 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 207 numRegions += entry.getValue().size(); 208 } 209 numRegions += unassignedRegions.size(); 210 211 regionsToIndex = new HashMap<>(numRegions); 212 servers = new ServerName[numServers]; 213 serversPerHost = new int[numHosts][]; 214 serversPerRack = new int[numRacks][]; 215 regions = new RegionInfo[numRegions]; 216 regionIndexToServerIndex = new int[numRegions]; 217 initialRegionIndexToServerIndex = new int[numRegions]; 218 regionIndexToTableIndex = new int[numRegions]; 219 regionIndexToPrimaryIndex = new int[numRegions]; 220 regionLoads = new Deque[numRegions]; 221 222 regionLocations = new int[numRegions][]; 223 serverIndicesSortedByRegionCount = new Integer[numServers]; 224 serverIndicesSortedByLocality = new Integer[numServers]; 225 localityPerServer = new float[numServers]; 226 227 serverIndexToHostIndex = new int[numServers]; 228 serverIndexToRackIndex = new int[numServers]; 229 regionsPerServer = new int[numServers][]; 230 serverIndexToRegionsOffset = new int[numServers]; 231 regionsPerHost = new int[numHosts][]; 232 regionsPerRack = new int[numRacks][]; 233 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 234 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 235 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 236 237 int regionIndex = 0, regionPerServerIndex = 0; 238 239 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 240 if (entry.getKey() == null) { 241 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 242 continue; 243 } 244 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 245 246 // keep the servername if this is the first server name for this hostname 247 // or this servername has the newest startcode. 248 if ( 249 servers[serverIndex] == null 250 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 251 ) { 252 servers[serverIndex] = entry.getKey(); 253 } 254 255 if (regionsPerServer[serverIndex] != null) { 256 // there is another server with the same hostAndPort in ClusterState. 257 // allocate the array for the total size 258 regionsPerServer[serverIndex] = 259 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 260 } else { 261 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 262 } 263 colocatedReplicaCountsPerServer[serverIndex] = 264 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 265 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 266 serverIndicesSortedByLocality[serverIndex] = serverIndex; 267 } 268 269 hosts = new String[numHosts]; 270 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 271 hosts[entry.getValue()] = entry.getKey(); 272 } 273 racks = new String[numRacks]; 274 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 275 racks[entry.getValue()] = entry.getKey(); 276 } 277 278 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 279 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 280 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 281 282 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 283 serverIndexToHostIndex[serverIndex] = hostIndex; 284 285 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 286 serverIndexToRackIndex[serverIndex] = rackIndex; 287 288 for (RegionInfo region : entry.getValue()) { 289 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 290 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 291 regionIndex++; 292 } 293 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 294 } 295 296 for (RegionInfo region : unassignedRegions) { 297 registerRegion(region, regionIndex, -1, loads, regionFinder); 298 regionIndex++; 299 } 300 301 if (LOG.isDebugEnabled()) { 302 for (int i = 0; i < numServers; i++) { 303 LOG.debug("server {} has {} regions", i, regionsPerServer[i].length); 304 } 305 } 306 for (int i = 0; i < serversPerHostList.size(); i++) { 307 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 308 for (int j = 0; j < serversPerHost[i].length; j++) { 309 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 310 LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); 311 } 312 if (serversPerHost[i].length > 1) { 313 multiServersPerHost = true; 314 } 315 } 316 317 for (int i = 0; i < serversPerRackList.size(); i++) { 318 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 319 for (int j = 0; j < serversPerRack[i].length; j++) { 320 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 321 LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 322 } 323 } 324 325 numTables = tables.size(); 326 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 327 numRacks); 328 numRegionsPerServerPerTable = new int[numTables][numServers]; 329 numRegionsPerTable = new int[numTables]; 330 331 for (int i = 0; i < numTables; i++) { 332 for (int j = 0; j < numServers; j++) { 333 numRegionsPerServerPerTable[i][j] = 0; 334 } 335 } 336 337 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 338 if (regionIndexToServerIndex[i] >= 0) { 339 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 340 numRegionsPerTable[regionIndexToTableIndex[i]]++; 341 } 342 } 343 344 // Avoid repeated computation for planning 345 meanRegionsPerTable = new double[numTables]; 346 347 for (int i = 0; i < numTables; i++) { 348 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 349 } 350 351 for (int i = 0; i < regions.length; i++) { 352 RegionInfo info = regions[i]; 353 if (RegionReplicaUtil.isDefaultReplica(info)) { 354 regionIndexToPrimaryIndex[i] = i; 355 } else { 356 hasRegionReplicas = true; 357 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 358 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 359 } 360 } 361 362 for (int i = 0; i < regionsPerServer.length; i++) { 363 colocatedReplicaCountsPerServer[i] = 364 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 365 for (int j = 0; j < regionsPerServer[i].length; j++) { 366 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 367 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 368 } 369 } 370 // compute regionsPerHost 371 if (multiServersPerHost) { 372 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 373 serversPerHost); 374 } 375 376 // compute regionsPerRack 377 if (numRacks > 1) { 378 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 379 serversPerRack); 380 } 381 } 382 383 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 384 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 385 for (int i = 0; i < serversPerLocation.length; i++) { 386 int numRegionsPerLocation = 0; 387 for (int j = 0; j < serversPerLocation[i].length; j++) { 388 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 389 } 390 regionsPerLocation[i] = new int[numRegionsPerLocation]; 391 colocatedReplicaCountsPerLocation[i] = 392 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 393 } 394 395 for (int i = 0; i < serversPerLocation.length; i++) { 396 int numRegionPerLocationIndex = 0; 397 for (int j = 0; j < serversPerLocation[i].length; j++) { 398 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 399 int region = regionsPerServer[serversPerLocation[i][j]][k]; 400 regionsPerLocation[i][numRegionPerLocationIndex] = region; 401 int primaryIndex = regionIndexToPrimaryIndex[region]; 402 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 403 numRegionPerLocationIndex++; 404 } 405 } 406 } 407 408 } 409 410 /** Helper for Cluster constructor to handle a region */ 411 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 412 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 413 String tableName = region.getTable().getNameAsString(); 414 if (!tablesToIndex.containsKey(tableName)) { 415 tables.add(tableName); 416 tablesToIndex.put(tableName, tablesToIndex.size()); 417 } 418 int tableIndex = tablesToIndex.get(tableName); 419 420 regionsToIndex.put(region, regionIndex); 421 regions[regionIndex] = region; 422 regionIndexToServerIndex[regionIndex] = serverIndex; 423 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 424 regionIndexToTableIndex[regionIndex] = tableIndex; 425 426 // region load 427 if (loads != null) { 428 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 429 // That could have failed if the RegionLoad is using the other regionName 430 if (rl == null) { 431 // Try getting the region load using encoded name. 432 rl = loads.get(region.getEncodedName()); 433 } 434 regionLoads[regionIndex] = rl; 435 } 436 437 if (regionFinder != null) { 438 // region location 439 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 440 regionLocations[regionIndex] = new int[loc.size()]; 441 for (int i = 0; i < loc.size(); i++) { 442 regionLocations[regionIndex][i] = loc.get(i) == null 443 ? -1 444 : (serversToIndex.get(loc.get(i).getAddress()) == null 445 ? -1 446 : serversToIndex.get(loc.get(i).getAddress())); 447 } 448 } 449 } 450 451 /** 452 * Returns true iff a given server has less regions than the balanced amount 453 */ 454 public boolean serverHasTooFewRegions(int server) { 455 int minLoad = this.numRegions / numServers; 456 int numRegions = getNumRegions(server); 457 return numRegions < minLoad; 458 } 459 460 /** 461 * Retrieves and lazily initializes a field storing the locality of every region/server 462 * combination 463 */ 464 public float[][] getOrComputeRackLocalities() { 465 if (rackLocalities == null || regionsToMostLocalEntities == null) { 466 computeCachedLocalities(); 467 } 468 return rackLocalities; 469 } 470 471 /** 472 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 473 * the locality 474 */ 475 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 476 if (rackLocalities == null || regionsToMostLocalEntities == null) { 477 computeCachedLocalities(); 478 } 479 return regionsToMostLocalEntities[type.ordinal()]; 480 } 481 482 /** 483 * Looks up locality from cache of localities. Will create cache if it does not already exist. 484 */ 485 public float getOrComputeLocality(int region, int entity, 486 BalancerClusterState.LocalityType type) { 487 switch (type) { 488 case SERVER: 489 return getLocalityOfRegion(region, entity); 490 case RACK: 491 return getOrComputeRackLocalities()[region][entity]; 492 default: 493 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 494 } 495 } 496 497 /** 498 * Returns locality weighted by region size in MB. Will create locality cache if it does not 499 * already exist. 500 */ 501 public double getOrComputeWeightedLocality(int region, int server, 502 BalancerClusterState.LocalityType type) { 503 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 504 } 505 506 /** 507 * Returns the size in MB from the most recent RegionLoad for region 508 */ 509 public int getRegionSizeMB(int region) { 510 Deque<BalancerRegionLoad> load = regionLoads[region]; 511 // This means regions have no actual data on disk 512 if (load == null) { 513 return 0; 514 } 515 return regionLoads[region].getLast().getStorefileSizeMB(); 516 } 517 518 /** 519 * Computes and caches the locality for each region/rack combinations, as well as storing a 520 * mapping of region -> server and region -> rack such that server and rack have the highest 521 * locality for region 522 */ 523 private void computeCachedLocalities() { 524 rackLocalities = new float[numRegions][numRacks]; 525 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 526 527 // Compute localities and find most local server per region 528 for (int region = 0; region < numRegions; region++) { 529 int serverWithBestLocality = 0; 530 float bestLocalityForRegion = 0; 531 for (int server = 0; server < numServers; server++) { 532 // Aggregate per-rack locality 533 float locality = getLocalityOfRegion(region, server); 534 int rack = serverIndexToRackIndex[server]; 535 int numServersInRack = serversPerRack[rack].length; 536 rackLocalities[region][rack] += locality / numServersInRack; 537 538 if (locality > bestLocalityForRegion) { 539 serverWithBestLocality = server; 540 bestLocalityForRegion = locality; 541 } 542 } 543 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 544 545 // Find most local rack per region 546 int rackWithBestLocality = 0; 547 float bestRackLocalityForRegion = 0.0f; 548 for (int rack = 0; rack < numRacks; rack++) { 549 float rackLocality = rackLocalities[region][rack]; 550 if (rackLocality > bestRackLocalityForRegion) { 551 bestRackLocalityForRegion = rackLocality; 552 rackWithBestLocality = rack; 553 } 554 } 555 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 556 } 557 558 } 559 560 /** 561 * Returns the size of hFiles from the most recent RegionLoad for region 562 */ 563 public int getTotalRegionHFileSizeMB(int region) { 564 Deque<BalancerRegionLoad> load = regionLoads[region]; 565 if (load == null) { 566 // This means, that the region has no actual data on disk 567 return 0; 568 } 569 return regionLoads[region].getLast().getRegionSizeMB(); 570 } 571 572 /** 573 * Returns the weighted cache ratio of a region on the given region server 574 */ 575 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 576 return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); 577 } 578 579 /** 580 * Returns the amount by which a region is cached on a given region server. If the region is not 581 * currently hosted on the given region server, then find out if it was previously hosted there 582 * and return the old cache ratio. 583 */ 584 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 585 float regionCacheRatio = 0.0f; 586 587 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 588 for (int regionIndex : regionsPerServer[regionServerIndex]) { 589 if (region != regionIndex) { 590 continue; 591 } 592 593 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 594 595 // The region is currently hosted on this region server. Get the region cache ratio for this 596 // region on this server 597 regionCacheRatio = 598 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 599 600 return regionCacheRatio; 601 } 602 603 // Region is not currently hosted on this server. Check if the region was cached on this 604 // server earlier. This can happen when the server was shutdown and the cache was persisted. 605 // Search using the region name and server name and not the index id and server id as these ids 606 // may change when a server is marked as dead or a new server is added. 607 String regionEncodedName = regions[region].getEncodedName(); 608 ServerName serverName = servers[regionServerIndex]; 609 if ( 610 regionCacheRatioOnOldServerMap != null 611 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 612 ) { 613 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 614 regionCacheRatioOnOldServerMap.get(regionEncodedName); 615 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 616 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 617 if (LOG.isDebugEnabled()) { 618 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 619 serverName, regionCacheRatio); 620 } 621 } 622 } 623 return regionCacheRatio; 624 } 625 626 /** 627 * Populate the maps containing information about how much a region is cached on a region server. 628 */ 629 private void computeRegionServerRegionCacheRatio() { 630 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 631 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 632 633 for (int region = 0; region < numRegions; region++) { 634 float bestRegionCacheRatio = 0.0f; 635 int serverWithBestRegionCacheRatio = 0; 636 for (int server = 0; server < numServers; server++) { 637 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 638 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 639 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 640 // cache ratio only if the cache ratio is greater than 0. 641 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 642 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 643 } 644 if (regionCacheRatio > bestRegionCacheRatio) { 645 serverWithBestRegionCacheRatio = server; 646 // If the server currently hosting the region has equal cache ratio to a historical 647 // server, consider the current server to keep hosting the region 648 bestRegionCacheRatio = regionCacheRatio; 649 } else if ( 650 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 651 ) { 652 // If two servers have same region cache ratio, then the server currently hosting the 653 // region 654 // should retain the region 655 serverWithBestRegionCacheRatio = server; 656 } 657 } 658 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 659 Pair<Integer, Integer> regionServerPair = 660 new Pair<>(region, regionIndexToServerIndex[region]); 661 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 662 if (tempRegionCacheRatio > bestRegionCacheRatio) { 663 LOG.warn( 664 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 665 + "best region cache ratio {} on server {}", 666 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 667 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 668 } 669 } 670 } 671 672 protected float getOrComputeRegionCacheRatio(int region, int server) { 673 if ( 674 regionServerIndexWithBestRegionCachedRatio == null 675 || regionIndexServerIndexRegionCachedRatio.isEmpty() 676 ) { 677 computeRegionServerRegionCacheRatio(); 678 } 679 680 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 681 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 682 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 683 : 0.0f; 684 } 685 686 public int[] getOrComputeServerWithBestRegionCachedRatio() { 687 if ( 688 regionServerIndexWithBestRegionCachedRatio == null 689 || regionIndexServerIndexRegionCachedRatio.isEmpty() 690 ) { 691 computeRegionServerRegionCacheRatio(); 692 } 693 return regionServerIndexWithBestRegionCachedRatio; 694 } 695 696 /** 697 * Maps region index to rack index 698 */ 699 public int getRackForRegion(int region) { 700 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 701 } 702 703 enum LocalityType { 704 SERVER, 705 RACK 706 } 707 708 public void doAction(BalanceAction action) { 709 switch (action.getType()) { 710 case NULL: 711 break; 712 case ASSIGN_REGION: 713 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 714 assert action instanceof AssignRegionAction : action.getClass(); 715 AssignRegionAction ar = (AssignRegionAction) action; 716 regionsPerServer[ar.getServer()] = 717 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 718 regionMoved(ar.getRegion(), -1, ar.getServer()); 719 break; 720 case MOVE_REGION: 721 assert action instanceof MoveRegionAction : action.getClass(); 722 MoveRegionAction mra = (MoveRegionAction) action; 723 regionsPerServer[mra.getFromServer()] = 724 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 725 regionsPerServer[mra.getToServer()] = 726 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 727 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 728 break; 729 case SWAP_REGIONS: 730 assert action instanceof SwapRegionsAction : action.getClass(); 731 SwapRegionsAction a = (SwapRegionsAction) action; 732 regionsPerServer[a.getFromServer()] = 733 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 734 regionsPerServer[a.getToServer()] = 735 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 736 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 737 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 738 break; 739 default: 740 throw new RuntimeException("Uknown action:" + action.getType()); 741 } 742 } 743 744 /** 745 * Return true if the placement of region on server would lower the availability of the region in 746 * question 747 * @return true or false 748 */ 749 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 750 if (!serversToIndex.containsKey(serverName.getAddress())) { 751 return false; // safeguard against race between cluster.servers and servers from LB method 752 // args 753 } 754 int server = serversToIndex.get(serverName.getAddress()); 755 int region = regionsToIndex.get(regionInfo); 756 757 // Region replicas for same region should better assign to different servers 758 for (int i : regionsPerServer[server]) { 759 RegionInfo otherRegionInfo = regions[i]; 760 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 761 return true; 762 } 763 } 764 765 int primary = regionIndexToPrimaryIndex[region]; 766 if (primary == -1) { 767 return false; 768 } 769 // there is a subset relation for server < host < rack 770 // check server first 771 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 772 if (result != 0) { 773 return result > 0; 774 } 775 776 // check host 777 if (multiServersPerHost) { 778 result = checkLocationForPrimary(serverIndexToHostIndex[server], 779 colocatedReplicaCountsPerHost, primary); 780 if (result != 0) { 781 return result > 0; 782 } 783 } 784 785 // check rack 786 if (numRacks > 1) { 787 result = checkLocationForPrimary(serverIndexToRackIndex[server], 788 colocatedReplicaCountsPerRack, primary); 789 if (result != 0) { 790 return result > 0; 791 } 792 } 793 return false; 794 } 795 796 /** 797 * Common method for better solution check. 798 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 799 * colocatedReplicaCountsPerRack 800 * @return 1 for better, -1 for no better, 0 for unknown 801 */ 802 private int checkLocationForPrimary(int location, 803 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 804 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 805 // check for whether there are other Locations that we can place this region 806 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 807 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 808 return 1; // meaning there is a better Location 809 } 810 } 811 return -1; // there is not a better Location to place this 812 } 813 return 0; 814 } 815 816 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 817 if (!serversToIndex.containsKey(serverName.getAddress())) { 818 return; 819 } 820 int server = serversToIndex.get(serverName.getAddress()); 821 int region = regionsToIndex.get(regionInfo); 822 doAction(new AssignRegionAction(region, server)); 823 } 824 825 void regionMoved(int region, int oldServer, int newServer) { 826 regionIndexToServerIndex[region] = newServer; 827 if (initialRegionIndexToServerIndex[region] == newServer) { 828 numMovedRegions--; // region moved back to original location 829 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 830 numMovedRegions++; // region moved from original location 831 } 832 int tableIndex = regionIndexToTableIndex[region]; 833 if (oldServer >= 0) { 834 numRegionsPerServerPerTable[tableIndex][oldServer]--; 835 } 836 numRegionsPerServerPerTable[tableIndex][newServer]++; 837 838 // update for servers 839 int primary = regionIndexToPrimaryIndex[region]; 840 if (oldServer >= 0) { 841 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 842 } 843 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 844 845 // update for hosts 846 if (multiServersPerHost) { 847 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 848 oldServer, newServer, primary, region); 849 } 850 851 // update for racks 852 if (numRacks > 1) { 853 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 854 oldServer, newServer, primary, region); 855 } 856 } 857 858 /** 859 * Common method for per host and per Location region index updates when a region is moved. 860 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 861 * @param regionsPerLocation regionsPerHost or regionsPerLocation 862 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 863 * colocatedReplicaCountsPerRack 864 */ 865 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 866 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 867 int primary, int region) { 868 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 869 int newLocation = serverIndexToLocation[newServer]; 870 if (newLocation != oldLocation) { 871 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 872 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 873 if (oldLocation >= 0) { 874 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 875 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 876 } 877 } 878 879 } 880 881 int[] removeRegion(int[] regions, int regionIndex) { 882 // TODO: this maybe costly. Consider using linked lists 883 int[] newRegions = new int[regions.length - 1]; 884 int i = 0; 885 for (i = 0; i < regions.length; i++) { 886 if (regions[i] == regionIndex) { 887 break; 888 } 889 newRegions[i] = regions[i]; 890 } 891 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 892 return newRegions; 893 } 894 895 int[] addRegion(int[] regions, int regionIndex) { 896 int[] newRegions = new int[regions.length + 1]; 897 System.arraycopy(regions, 0, newRegions, 0, regions.length); 898 newRegions[newRegions.length - 1] = regionIndex; 899 return newRegions; 900 } 901 902 int[] addRegionSorted(int[] regions, int regionIndex) { 903 int[] newRegions = new int[regions.length + 1]; 904 int i = 0; 905 for (i = 0; i < regions.length; i++) { // find the index to insert 906 if (regions[i] > regionIndex) { 907 break; 908 } 909 } 910 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 911 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 912 newRegions[i] = regionIndex; 913 914 return newRegions; 915 } 916 917 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 918 int i = 0; 919 for (i = 0; i < regions.length; i++) { 920 if (regions[i] == regionIndex) { 921 regions[i] = newRegionIndex; 922 break; 923 } 924 } 925 return regions; 926 } 927 928 void sortServersByRegionCount() { 929 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 930 } 931 932 int getNumRegions(int server) { 933 return regionsPerServer[server].length; 934 } 935 936 boolean contains(int[] arr, int val) { 937 return Arrays.binarySearch(arr, val) >= 0; 938 } 939 940 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 941 942 public Comparator<Integer> getNumRegionsComparator() { 943 return numRegionsComparator; 944 } 945 946 int getLowestLocalityRegionOnServer(int serverIndex) { 947 if (regionFinder != null) { 948 float lowestLocality = 1.0f; 949 int lowestLocalityRegionIndex = -1; 950 if (regionsPerServer[serverIndex].length == 0) { 951 // No regions on that region server 952 return -1; 953 } 954 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 955 int regionIndex = regionsPerServer[serverIndex][j]; 956 HDFSBlocksDistribution distribution = 957 regionFinder.getBlockDistribution(regions[regionIndex]); 958 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 959 // skip empty region 960 if (distribution.getUniqueBlocksTotalWeight() == 0) { 961 continue; 962 } 963 if (locality < lowestLocality) { 964 lowestLocality = locality; 965 lowestLocalityRegionIndex = j; 966 } 967 } 968 if (lowestLocalityRegionIndex == -1) { 969 return -1; 970 } 971 if (LOG.isTraceEnabled()) { 972 LOG.trace("Lowest locality region is " 973 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 974 .getRegionNameAsString() 975 + " with locality " + lowestLocality + " and its region server contains " 976 + regionsPerServer[serverIndex].length + " regions"); 977 } 978 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 979 } else { 980 return -1; 981 } 982 } 983 984 float getLocalityOfRegion(int region, int server) { 985 if (regionFinder != null) { 986 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 987 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 988 } else { 989 return 0f; 990 } 991 } 992 993 void setNumRegions(int numRegions) { 994 this.numRegions = numRegions; 995 } 996 997 void setNumMovedRegions(int numMovedRegions) { 998 this.numMovedRegions = numMovedRegions; 999 } 1000 1001 @Override 1002 public String toString() { 1003 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1004 for (ServerName sn : servers) { 1005 desc.append(sn.getAddress().toString()).append(", "); 1006 } 1007 desc.append("], serverIndicesSortedByRegionCount=") 1008 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 1009 .append(Arrays.deepToString(regionsPerServer)); 1010 1011 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1012 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 1013 .append('}'); 1014 return desc.toString(); 1015 } 1016}