001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Supplier; 032import org.agrona.collections.Hashing; 033import org.agrona.collections.Int2IntCounterMap; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionReplicaUtil; 038import org.apache.hadoop.hbase.master.RackManager; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 047 048/** 049 * An efficient array based implementation similar to ClusterState for keeping the status of the 050 * cluster in terms of region assignment and distribution. LoadBalancers, such as 051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 052 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 053 * <p/> 054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 055 * topology in terms of server names, hostnames and racks. 056 */ 057@InterfaceAudience.Private 058class BalancerClusterState { 059 060 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 061 062 ServerName[] servers; 063 // ServerName uniquely identifies a region server. multiple RS can run on the same host 064 String[] hosts; 065 String[] racks; 066 boolean multiServersPerHost = false; // whether or not any host has more than one server 067 068 ArrayList<String> tables; 069 RegionInfo[] regions; 070 Deque<BalancerRegionLoad>[] regionLoads; 071 private RegionHDFSBlockLocationFinder regionFinder; 072 073 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 074 075 int[] serverIndexToHostIndex; // serverIndex -> host index 076 int[] serverIndexToRackIndex; // serverIndex -> rack index 077 078 int[][] regionsPerServer; // serverIndex -> region list 079 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 080 int[][] regionsPerHost; // hostIndex -> list of regions 081 int[][] regionsPerRack; // rackIndex -> region list 082 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 083 // replicas by primary region index 084 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 085 // primary region index 086 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 087 // primary region index 088 089 int[][] serversPerHost; // hostIndex -> list of server indexes 090 int[][] serversPerRack; // rackIndex -> list of server indexes 091 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 092 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 093 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 094 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 095 int[] numRegionsPerTable; // tableIndex -> region count 096 double[] meanRegionsPerTable; // mean region count per table 097 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 098 boolean hasRegionReplicas = false; // whether there is regions with replicas 099 100 Integer[] serverIndicesSortedByRegionCount; 101 Integer[] serverIndicesSortedByLocality; 102 103 Map<Address, Integer> serversToIndex; 104 Map<String, Integer> hostsToIndex; 105 Map<String, Integer> racksToIndex; 106 Map<String, Integer> tablesToIndex; 107 Map<RegionInfo, Integer> regionsToIndex; 108 float[] localityPerServer; 109 110 int numServers; 111 int numHosts; 112 int numRacks; 113 int numTables; 114 int numRegions; 115 int maxReplicas = 1; 116 117 int numMovedRegions = 0; // num moved regions from the initial configuration 118 Map<ServerName, List<RegionInfo>> clusterState; 119 120 private final RackManager rackManager; 121 // Maps region -> rackIndex -> locality of region on rack 122 private float[][] rackLocalities; 123 // Maps localityType -> region -> [server|rack]Index with highest locality 124 private int[][] regionsToMostLocalEntities; 125 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 126 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 127 // Maps regionIndex -> serverIndex with best region cache ratio 128 private int[] regionServerIndexWithBestRegionCachedRatio; 129 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 130 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 131 // cache free space available on each server, aligned to the "servers" array indices; 132 long[] serverBlockCacheFreeSize; 133 134 private final Supplier<List<Integer>> shuffledServerIndicesSupplier = 135 Suppliers.memoizeWithExpiration(() -> { 136 Collection<Integer> serverIndices = serversToIndex.values(); 137 List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); 138 Collections.shuffle(shuffledServerIndices); 139 return shuffledServerIndices; 140 }, 5, TimeUnit.SECONDS); 141 private long stopRequestedAt = Long.MAX_VALUE; 142 143 static class DefaultRackManager extends RackManager { 144 @Override 145 public String getRack(ServerName server) { 146 return UNKNOWN_RACK; 147 } 148 } 149 150 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 151 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 152 RackManager rackManager) { 153 this(null, clusterState, loads, regionFinder, rackManager, null, null); 154 } 155 156 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 157 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 158 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio, 159 Map<ServerName, Long> serverBlockCacheFreeByServer) { 160 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio, 161 serverBlockCacheFreeByServer); 162 } 163 164 @SuppressWarnings("unchecked") 165 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 166 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 167 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, 168 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio, 169 Map<ServerName, Long> serverBlockCacheFreeByServer) { 170 if (unassignedRegions == null) { 171 unassignedRegions = Collections.emptyList(); 172 } 173 174 serversToIndex = new HashMap<>(); 175 hostsToIndex = new HashMap<>(); 176 racksToIndex = new HashMap<>(); 177 tablesToIndex = new HashMap<>(); 178 179 // TODO: We should get the list of tables from master 180 tables = new ArrayList<>(); 181 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 182 183 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 184 185 numRegions = 0; 186 187 List<List<Integer>> serversPerHostList = new ArrayList<>(); 188 List<List<Integer>> serversPerRackList = new ArrayList<>(); 189 this.clusterState = clusterState; 190 this.regionFinder = regionFinder; 191 192 // Use servername and port as there can be dead servers in this list. We want everything with 193 // a matching hostname and port to have the same index. 194 for (ServerName sn : clusterState.keySet()) { 195 if (sn == null) { 196 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 197 + "skipping; unassigned regions?"); 198 if (LOG.isTraceEnabled()) { 199 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 200 } 201 continue; 202 } 203 if (serversToIndex.get(sn.getAddress()) == null) { 204 serversToIndex.put(sn.getAddress(), numServers++); 205 } 206 if (!hostsToIndex.containsKey(sn.getHostname())) { 207 hostsToIndex.put(sn.getHostname(), numHosts++); 208 serversPerHostList.add(new ArrayList<>(1)); 209 } 210 211 int serverIndex = serversToIndex.get(sn.getAddress()); 212 int hostIndex = hostsToIndex.get(sn.getHostname()); 213 serversPerHostList.get(hostIndex).add(serverIndex); 214 215 String rack = this.rackManager.getRack(sn); 216 217 if (!racksToIndex.containsKey(rack)) { 218 racksToIndex.put(rack, numRacks++); 219 serversPerRackList.add(new ArrayList<>()); 220 } 221 int rackIndex = racksToIndex.get(rack); 222 serversPerRackList.get(rackIndex).add(serverIndex); 223 } 224 225 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 226 // Count how many regions there are. 227 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 228 numRegions += entry.getValue().size(); 229 } 230 numRegions += unassignedRegions.size(); 231 232 regionsToIndex = new HashMap<>(numRegions); 233 servers = new ServerName[numServers]; 234 serversPerHost = new int[numHosts][]; 235 serversPerRack = new int[numRacks][]; 236 regions = new RegionInfo[numRegions]; 237 regionIndexToServerIndex = new int[numRegions]; 238 initialRegionIndexToServerIndex = new int[numRegions]; 239 regionIndexToTableIndex = new int[numRegions]; 240 regionIndexToPrimaryIndex = new int[numRegions]; 241 regionLoads = new Deque[numRegions]; 242 243 regionLocations = new int[numRegions][]; 244 serverIndicesSortedByRegionCount = new Integer[numServers]; 245 serverIndicesSortedByLocality = new Integer[numServers]; 246 localityPerServer = new float[numServers]; 247 248 serverIndexToHostIndex = new int[numServers]; 249 serverIndexToRackIndex = new int[numServers]; 250 regionsPerServer = new int[numServers][]; 251 serverIndexToRegionsOffset = new int[numServers]; 252 regionsPerHost = new int[numHosts][]; 253 regionsPerRack = new int[numRacks][]; 254 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 255 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 256 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 257 258 int regionIndex = 0, regionPerServerIndex = 0; 259 260 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 261 if (entry.getKey() == null) { 262 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 263 continue; 264 } 265 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 266 267 // keep the servername if this is the first server name for this hostname 268 // or this servername has the newest startcode. 269 if ( 270 servers[serverIndex] == null 271 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 272 ) { 273 servers[serverIndex] = entry.getKey(); 274 } 275 276 if (regionsPerServer[serverIndex] != null) { 277 // there is another server with the same hostAndPort in ClusterState. 278 // allocate the array for the total size 279 regionsPerServer[serverIndex] = 280 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 281 } else { 282 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 283 } 284 colocatedReplicaCountsPerServer[serverIndex] = 285 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 286 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 287 serverIndicesSortedByLocality[serverIndex] = serverIndex; 288 } 289 290 hosts = new String[numHosts]; 291 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 292 hosts[entry.getValue()] = entry.getKey(); 293 } 294 racks = new String[numRacks]; 295 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 296 racks[entry.getValue()] = entry.getKey(); 297 } 298 299 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 300 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 301 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 302 303 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 304 serverIndexToHostIndex[serverIndex] = hostIndex; 305 306 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 307 serverIndexToRackIndex[serverIndex] = rackIndex; 308 309 for (RegionInfo region : entry.getValue()) { 310 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 311 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 312 regionIndex++; 313 } 314 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 315 } 316 317 for (RegionInfo region : unassignedRegions) { 318 registerRegion(region, regionIndex, -1, loads, regionFinder); 319 regionIndex++; 320 } 321 322 if (LOG.isTraceEnabled()) { 323 for (int i = 0; i < numServers; i++) { 324 LOG.trace("server {} has {} regions", i, regionsPerServer[i].length); 325 } 326 } 327 for (int i = 0; i < serversPerHostList.size(); i++) { 328 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 329 for (int j = 0; j < serversPerHost[i].length; j++) { 330 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 331 LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i); 332 } 333 if (serversPerHost[i].length > 1) { 334 multiServersPerHost = true; 335 } 336 } 337 338 for (int i = 0; i < serversPerRackList.size(); i++) { 339 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 340 for (int j = 0; j < serversPerRack[i].length; j++) { 341 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 342 LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 343 } 344 } 345 346 numTables = tables.size(); 347 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 348 numRacks); 349 numRegionsPerServerPerTable = new int[numTables][numServers]; 350 numRegionsPerTable = new int[numTables]; 351 352 for (int i = 0; i < numTables; i++) { 353 for (int j = 0; j < numServers; j++) { 354 numRegionsPerServerPerTable[i][j] = 0; 355 } 356 } 357 358 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 359 if (regionIndexToServerIndex[i] >= 0) { 360 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 361 numRegionsPerTable[regionIndexToTableIndex[i]]++; 362 } 363 } 364 365 // Avoid repeated computation for planning 366 meanRegionsPerTable = new double[numTables]; 367 368 for (int i = 0; i < numTables; i++) { 369 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 370 } 371 372 for (int i = 0; i < regions.length; i++) { 373 RegionInfo info = regions[i]; 374 if (RegionReplicaUtil.isDefaultReplica(info)) { 375 regionIndexToPrimaryIndex[i] = i; 376 } else { 377 hasRegionReplicas = true; 378 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 379 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 380 } 381 } 382 383 for (int i = 0; i < regionsPerServer.length; i++) { 384 colocatedReplicaCountsPerServer[i] = 385 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 386 for (int j = 0; j < regionsPerServer[i].length; j++) { 387 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 388 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 389 } 390 } 391 // compute regionsPerHost 392 if (multiServersPerHost) { 393 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 394 serversPerHost); 395 } 396 397 // compute regionsPerRack 398 if (numRacks > 1) { 399 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 400 serversPerRack); 401 } 402 403 this.serverBlockCacheFreeSize = new long[numServers]; 404 if (serverBlockCacheFreeByServer != null) { 405 for (int i = 0; i < numServers; i++) { 406 ServerName sn = servers[i]; 407 this.serverBlockCacheFreeSize[i] = 408 sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L); 409 } 410 } 411 } 412 413 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 414 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 415 for (int i = 0; i < serversPerLocation.length; i++) { 416 int numRegionsPerLocation = 0; 417 for (int j = 0; j < serversPerLocation[i].length; j++) { 418 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 419 } 420 regionsPerLocation[i] = new int[numRegionsPerLocation]; 421 colocatedReplicaCountsPerLocation[i] = 422 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 423 } 424 425 for (int i = 0; i < serversPerLocation.length; i++) { 426 int numRegionPerLocationIndex = 0; 427 for (int j = 0; j < serversPerLocation[i].length; j++) { 428 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 429 int region = regionsPerServer[serversPerLocation[i][j]][k]; 430 regionsPerLocation[i][numRegionPerLocationIndex] = region; 431 int primaryIndex = regionIndexToPrimaryIndex[region]; 432 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 433 numRegionPerLocationIndex++; 434 } 435 } 436 } 437 438 } 439 440 /** Helper for Cluster constructor to handle a region */ 441 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 442 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 443 String tableName = region.getTable().getNameAsString(); 444 if (!tablesToIndex.containsKey(tableName)) { 445 tables.add(tableName); 446 tablesToIndex.put(tableName, tablesToIndex.size()); 447 } 448 int tableIndex = tablesToIndex.get(tableName); 449 450 regionsToIndex.put(region, regionIndex); 451 regions[regionIndex] = region; 452 regionIndexToServerIndex[regionIndex] = serverIndex; 453 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 454 regionIndexToTableIndex[regionIndex] = tableIndex; 455 456 // region load 457 if (loads != null) { 458 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 459 // That could have failed if the RegionLoad is using the other regionName 460 if (rl == null) { 461 // Try getting the region load using encoded name. 462 rl = loads.get(region.getEncodedName()); 463 } 464 regionLoads[regionIndex] = rl; 465 } 466 467 if (regionFinder != null) { 468 // region location 469 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 470 regionLocations[regionIndex] = new int[loc.size()]; 471 for (int i = 0; i < loc.size(); i++) { 472 regionLocations[regionIndex][i] = loc.get(i) == null 473 ? -1 474 : (serversToIndex.get(loc.get(i).getAddress()) == null 475 ? -1 476 : serversToIndex.get(loc.get(i).getAddress())); 477 } 478 } 479 480 int numReplicas = region.getReplicaId() + 1; 481 if (numReplicas > maxReplicas) { 482 maxReplicas = numReplicas; 483 } 484 } 485 486 /** 487 * Returns true iff a given server has less regions than the balanced amount 488 */ 489 public boolean serverHasTooFewRegions(int server) { 490 int minLoad = this.numRegions / numServers; 491 int numRegions = getNumRegions(server); 492 return numRegions < minLoad; 493 } 494 495 /** 496 * Retrieves and lazily initializes a field storing the locality of every region/server 497 * combination 498 */ 499 public float[][] getOrComputeRackLocalities() { 500 if (rackLocalities == null || regionsToMostLocalEntities == null) { 501 computeCachedLocalities(); 502 } 503 return rackLocalities; 504 } 505 506 /** 507 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 508 * the locality 509 */ 510 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 511 if (rackLocalities == null || regionsToMostLocalEntities == null) { 512 computeCachedLocalities(); 513 } 514 return regionsToMostLocalEntities[type.ordinal()]; 515 } 516 517 /** 518 * Looks up locality from cache of localities. Will create cache if it does not already exist. 519 */ 520 public float getOrComputeLocality(int region, int entity, 521 BalancerClusterState.LocalityType type) { 522 switch (type) { 523 case SERVER: 524 return getLocalityOfRegion(region, entity); 525 case RACK: 526 return getOrComputeRackLocalities()[region][entity]; 527 default: 528 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 529 } 530 } 531 532 /** 533 * Returns locality weighted by region size in MB. Will create locality cache if it does not 534 * already exist. 535 */ 536 public double getOrComputeWeightedLocality(int region, int server, 537 BalancerClusterState.LocalityType type) { 538 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 539 } 540 541 /** 542 * Returns the size in MB from the most recent RegionLoad for region 543 */ 544 public int getRegionSizeMB(int region) { 545 Deque<BalancerRegionLoad> load = regionLoads[region]; 546 // This means regions have no actual data on disk 547 if (load == null) { 548 return 0; 549 } 550 return regionLoads[region].getLast().getStorefileSizeMB(); 551 } 552 553 /** 554 * Computes and caches the locality for each region/rack combinations, as well as storing a 555 * mapping of region -> server and region -> rack such that server and rack have the highest 556 * locality for region 557 */ 558 private void computeCachedLocalities() { 559 rackLocalities = new float[numRegions][numRacks]; 560 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 561 562 // Compute localities and find most local server per region 563 for (int region = 0; region < numRegions; region++) { 564 int serverWithBestLocality = 0; 565 float bestLocalityForRegion = 0; 566 for (int server = 0; server < numServers; server++) { 567 // Aggregate per-rack locality 568 float locality = getLocalityOfRegion(region, server); 569 int rack = serverIndexToRackIndex[server]; 570 int numServersInRack = serversPerRack[rack].length; 571 rackLocalities[region][rack] += locality / numServersInRack; 572 573 if (locality > bestLocalityForRegion) { 574 serverWithBestLocality = server; 575 bestLocalityForRegion = locality; 576 } 577 } 578 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 579 580 // Find most local rack per region 581 int rackWithBestLocality = 0; 582 float bestRackLocalityForRegion = 0.0f; 583 for (int rack = 0; rack < numRacks; rack++) { 584 float rackLocality = rackLocalities[region][rack]; 585 if (rackLocality > bestRackLocalityForRegion) { 586 bestRackLocalityForRegion = rackLocality; 587 rackWithBestLocality = rack; 588 } 589 } 590 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 591 } 592 593 } 594 595 /** 596 * Returns the size of hFiles from the most recent RegionLoad for region 597 */ 598 public int getTotalRegionHFileSizeMB(int region) { 599 Deque<BalancerRegionLoad> load = regionLoads[region]; 600 if (load == null) { 601 // This means, that the region has no actual data on disk 602 return 0; 603 } 604 return regionLoads[region].getLast().getRegionSizeMB(); 605 } 606 607 /** 608 * Returns the weighted cache ratio of a region on the given region server 609 */ 610 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 611 return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); 612 } 613 614 /** 615 * Returns the amount by which a region is cached on a given region server. If the region is not 616 * currently hosted on the given region server, then find out if it was previously hosted there 617 * and return the old cache ratio. 618 */ 619 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 620 float regionCacheRatio = 0.0f; 621 622 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 623 for (int regionIndex : regionsPerServer[regionServerIndex]) { 624 if (region != regionIndex) { 625 continue; 626 } 627 628 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 629 630 // The region is currently hosted on this region server. Get the region cache ratio for this 631 // region on this server 632 regionCacheRatio = 633 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 634 635 return regionCacheRatio; 636 } 637 638 // Region is not currently hosted on this server. Check if the region was cached on this 639 // server earlier. This can happen when the server was shutdown and the cache was persisted. 640 // Search using the region name and server name and not the index id and server id as these ids 641 // may change when a server is marked as dead or a new server is added. 642 String regionEncodedName = regions[region].getEncodedName(); 643 ServerName serverName = servers[regionServerIndex]; 644 if ( 645 regionCacheRatioOnOldServerMap != null 646 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 647 ) { 648 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 649 regionCacheRatioOnOldServerMap.get(regionEncodedName); 650 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 651 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 652 if (LOG.isDebugEnabled()) { 653 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 654 serverName, regionCacheRatio); 655 } 656 } 657 } 658 return regionCacheRatio; 659 } 660 661 /** 662 * Populate the maps containing information about how much a region is cached on a region server. 663 */ 664 private void computeRegionServerRegionCacheRatio() { 665 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 666 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 667 668 for (int region = 0; region < numRegions; region++) { 669 float bestRegionCacheRatio = 0.0f; 670 int serverWithBestRegionCacheRatio = 0; 671 for (int server = 0; server < numServers; server++) { 672 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 673 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 674 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 675 // cache ratio only if the cache ratio is greater than 0. 676 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 677 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 678 } 679 if (regionCacheRatio > bestRegionCacheRatio) { 680 serverWithBestRegionCacheRatio = server; 681 // If the server currently hosting the region has equal cache ratio to a historical 682 // server, consider the current server to keep hosting the region 683 bestRegionCacheRatio = regionCacheRatio; 684 } else if ( 685 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 686 ) { 687 // If two servers have same region cache ratio, then the server currently hosting the 688 // region 689 // should retain the region 690 serverWithBestRegionCacheRatio = server; 691 } 692 } 693 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 694 Pair<Integer, Integer> regionServerPair = 695 new Pair<>(region, regionIndexToServerIndex[region]); 696 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 697 if (tempRegionCacheRatio > bestRegionCacheRatio) { 698 LOG.warn( 699 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 700 + "best region cache ratio {} on server {}", 701 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 702 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 703 } 704 } 705 } 706 707 protected float getOrComputeRegionCacheRatio(int region, int server) { 708 if ( 709 regionServerIndexWithBestRegionCachedRatio == null 710 || regionIndexServerIndexRegionCachedRatio.isEmpty() 711 ) { 712 computeRegionServerRegionCacheRatio(); 713 } 714 715 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 716 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 717 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 718 : 0.0f; 719 } 720 721 public int[] getOrComputeServerWithBestRegionCachedRatio() { 722 if ( 723 regionServerIndexWithBestRegionCachedRatio == null 724 || regionIndexServerIndexRegionCachedRatio.isEmpty() 725 ) { 726 computeRegionServerRegionCacheRatio(); 727 } 728 return regionServerIndexWithBestRegionCachedRatio; 729 } 730 731 /** 732 * Finds and return the latest reported cache ratio for the region on the RegionServer it's 733 * currently online. 734 */ 735 float getObservedRegionCacheRatio(int region) { 736 Deque<BalancerRegionLoad> dq = regionLoads[region]; 737 if (dq == null || dq.isEmpty()) { 738 return 0.0f; 739 } 740 return dq.getLast().getCurrentRegionCacheRatio(); 741 } 742 743 /** 744 * Maps region index to rack index 745 */ 746 public int getRackForRegion(int region) { 747 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 748 } 749 750 enum LocalityType { 751 SERVER, 752 RACK 753 } 754 755 public void doAction(BalanceAction action) { 756 switch (action.getType()) { 757 case NULL: 758 break; 759 case ASSIGN_REGION: 760 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 761 assert action instanceof AssignRegionAction : action.getClass(); 762 AssignRegionAction ar = (AssignRegionAction) action; 763 regionsPerServer[ar.getServer()] = 764 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 765 regionMoved(ar.getRegion(), -1, ar.getServer()); 766 break; 767 case MOVE_REGION: 768 assert action instanceof MoveRegionAction : action.getClass(); 769 MoveRegionAction mra = (MoveRegionAction) action; 770 regionsPerServer[mra.getFromServer()] = 771 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 772 regionsPerServer[mra.getToServer()] = 773 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 774 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 775 break; 776 case SWAP_REGIONS: 777 assert action instanceof SwapRegionsAction : action.getClass(); 778 SwapRegionsAction a = (SwapRegionsAction) action; 779 regionsPerServer[a.getFromServer()] = 780 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 781 regionsPerServer[a.getToServer()] = 782 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 783 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 784 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 785 break; 786 case MOVE_BATCH: 787 assert action instanceof MoveBatchAction : action.getClass(); 788 MoveBatchAction mba = (MoveBatchAction) action; 789 for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { 790 Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); 791 regionsPerServer[serverIndex] = 792 removeRegions(regionsPerServer[serverIndex], regionsToRemove); 793 } 794 for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { 795 Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); 796 regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); 797 } 798 for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { 799 regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), 800 moveRegionAction.getToServer()); 801 } 802 break; 803 default: 804 throw new RuntimeException("Unknown action:" + action.getType()); 805 } 806 } 807 808 /** 809 * Return true if the placement of region on server would lower the availability of the region in 810 * question 811 * @return true or false 812 */ 813 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 814 if (!serversToIndex.containsKey(serverName.getAddress())) { 815 return false; // safeguard against race between cluster.servers and servers from LB method 816 // args 817 } 818 int server = serversToIndex.get(serverName.getAddress()); 819 int region = regionsToIndex.get(regionInfo); 820 821 // Region replicas for same region should better assign to different servers 822 for (int i : regionsPerServer[server]) { 823 RegionInfo otherRegionInfo = regions[i]; 824 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 825 return true; 826 } 827 } 828 829 int primary = regionIndexToPrimaryIndex[region]; 830 if (primary == -1) { 831 return false; 832 } 833 // there is a subset relation for server < host < rack 834 // check server first 835 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 836 if (result != 0) { 837 return result > 0; 838 } 839 840 // check host 841 if (multiServersPerHost) { 842 result = checkLocationForPrimary(serverIndexToHostIndex[server], 843 colocatedReplicaCountsPerHost, primary); 844 if (result != 0) { 845 return result > 0; 846 } 847 } 848 849 // check rack 850 if (numRacks > 1) { 851 result = checkLocationForPrimary(serverIndexToRackIndex[server], 852 colocatedReplicaCountsPerRack, primary); 853 if (result != 0) { 854 return result > 0; 855 } 856 } 857 return false; 858 } 859 860 /** 861 * Common method for better solution check. 862 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 863 * colocatedReplicaCountsPerRack 864 * @return 1 for better, -1 for no better, 0 for unknown 865 */ 866 private int checkLocationForPrimary(int location, 867 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 868 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 869 // check for whether there are other Locations that we can place this region 870 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 871 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 872 return 1; // meaning there is a better Location 873 } 874 } 875 return -1; // there is not a better Location to place this 876 } 877 return 0; 878 } 879 880 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 881 if (!serversToIndex.containsKey(serverName.getAddress())) { 882 return; 883 } 884 int server = serversToIndex.get(serverName.getAddress()); 885 int region = regionsToIndex.get(regionInfo); 886 doAction(new AssignRegionAction(region, server)); 887 } 888 889 void regionMoved(int region, int oldServer, int newServer) { 890 regionIndexToServerIndex[region] = newServer; 891 if (initialRegionIndexToServerIndex[region] == newServer) { 892 numMovedRegions--; // region moved back to original location 893 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 894 numMovedRegions++; // region moved from original location 895 } 896 int tableIndex = regionIndexToTableIndex[region]; 897 if (oldServer >= 0) { 898 numRegionsPerServerPerTable[tableIndex][oldServer]--; 899 } 900 numRegionsPerServerPerTable[tableIndex][newServer]++; 901 902 // update for servers 903 int primary = regionIndexToPrimaryIndex[region]; 904 if (oldServer >= 0) { 905 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 906 } 907 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 908 909 // update for hosts 910 if (multiServersPerHost) { 911 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 912 oldServer, newServer, primary, region); 913 } 914 915 // update for racks 916 if (numRacks > 1) { 917 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 918 oldServer, newServer, primary, region); 919 } 920 } 921 922 /** 923 * Common method for per host and per Location region index updates when a region is moved. 924 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 925 * @param regionsPerLocation regionsPerHost or regionsPerLocation 926 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 927 * colocatedReplicaCountsPerRack 928 */ 929 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 930 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 931 int primary, int region) { 932 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 933 int newLocation = serverIndexToLocation[newServer]; 934 if (newLocation != oldLocation) { 935 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 936 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 937 if (oldLocation >= 0) { 938 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 939 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 940 } 941 } 942 943 } 944 945 int[] removeRegion(int[] regions, int regionIndex) { 946 // TODO: this maybe costly. Consider using linked lists 947 int[] newRegions = new int[regions.length - 1]; 948 int i = 0; 949 for (i = 0; i < regions.length; i++) { 950 if (regions[i] == regionIndex) { 951 break; 952 } 953 newRegions[i] = regions[i]; 954 } 955 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 956 return newRegions; 957 } 958 959 int[] addRegion(int[] regions, int regionIndex) { 960 int[] newRegions = new int[regions.length + 1]; 961 System.arraycopy(regions, 0, newRegions, 0, regions.length); 962 newRegions[newRegions.length - 1] = regionIndex; 963 return newRegions; 964 } 965 966 int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { 967 // Calculate the size of the new regions array 968 int newSize = regions.length - regionIndicesToRemove.size(); 969 if (newSize < 0) { 970 throw new IllegalStateException( 971 "Region indices mismatch: more regions to remove than in the regions array"); 972 } 973 974 int[] newRegions = new int[newSize]; 975 int newIndex = 0; 976 977 // Copy only the regions not in the removal set 978 for (int region : regions) { 979 if (!regionIndicesToRemove.contains(region)) { 980 newRegions[newIndex++] = region; 981 } 982 } 983 984 // If the newIndex is smaller than newSize, some regions were missing from the input array 985 if (newIndex != newSize) { 986 throw new IllegalStateException("Region indices mismatch: some regions in the removal " 987 + "set were not found in the regions array"); 988 } 989 990 return newRegions; 991 } 992 993 int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { 994 int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; 995 996 // Copy the existing regions to the new array 997 System.arraycopy(regions, 0, newRegions, 0, regions.length); 998 999 // Add the new regions at the end of the array 1000 int newIndex = regions.length; 1001 for (int regionIndex : regionIndicesToAdd) { 1002 newRegions[newIndex++] = regionIndex; 1003 } 1004 1005 return newRegions; 1006 } 1007 1008 List<Integer> getShuffledServerIndices() { 1009 return shuffledServerIndicesSupplier.get(); 1010 } 1011 1012 int[] addRegionSorted(int[] regions, int regionIndex) { 1013 int[] newRegions = new int[regions.length + 1]; 1014 int i = 0; 1015 for (i = 0; i < regions.length; i++) { // find the index to insert 1016 if (regions[i] > regionIndex) { 1017 break; 1018 } 1019 } 1020 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 1021 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 1022 newRegions[i] = regionIndex; 1023 1024 return newRegions; 1025 } 1026 1027 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 1028 int i = 0; 1029 for (i = 0; i < regions.length; i++) { 1030 if (regions[i] == regionIndex) { 1031 regions[i] = newRegionIndex; 1032 break; 1033 } 1034 } 1035 return regions; 1036 } 1037 1038 void sortServersByRegionCount() { 1039 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 1040 } 1041 1042 int getNumRegions(int server) { 1043 return regionsPerServer[server].length; 1044 } 1045 1046 boolean contains(int[] arr, int val) { 1047 return Arrays.binarySearch(arr, val) >= 0; 1048 } 1049 1050 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 1051 1052 public Comparator<Integer> getNumRegionsComparator() { 1053 return numRegionsComparator; 1054 } 1055 1056 int getLowestLocalityRegionOnServer(int serverIndex) { 1057 if (regionFinder != null) { 1058 float lowestLocality = 1.0f; 1059 int lowestLocalityRegionIndex = -1; 1060 if (regionsPerServer[serverIndex].length == 0) { 1061 // No regions on that region server 1062 return -1; 1063 } 1064 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 1065 int regionIndex = regionsPerServer[serverIndex][j]; 1066 HDFSBlocksDistribution distribution = 1067 regionFinder.getBlockDistribution(regions[regionIndex]); 1068 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 1069 // skip empty region 1070 if (distribution.getUniqueBlocksTotalWeight() == 0) { 1071 continue; 1072 } 1073 if (locality < lowestLocality) { 1074 lowestLocality = locality; 1075 lowestLocalityRegionIndex = j; 1076 } 1077 } 1078 if (lowestLocalityRegionIndex == -1) { 1079 return -1; 1080 } 1081 if (LOG.isTraceEnabled()) { 1082 LOG.trace("Lowest locality region is " 1083 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 1084 .getRegionNameAsString() 1085 + " with locality " + lowestLocality + " and its region server contains " 1086 + regionsPerServer[serverIndex].length + " regions"); 1087 } 1088 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 1089 } else { 1090 return -1; 1091 } 1092 } 1093 1094 float getLocalityOfRegion(int region, int server) { 1095 if (regionFinder != null) { 1096 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 1097 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 1098 } else { 1099 return 0f; 1100 } 1101 } 1102 1103 void setNumRegions(int numRegions) { 1104 this.numRegions = numRegions; 1105 } 1106 1107 void setNumMovedRegions(int numMovedRegions) { 1108 this.numMovedRegions = numMovedRegions; 1109 } 1110 1111 public int getMaxReplicas() { 1112 return maxReplicas; 1113 } 1114 1115 void setStopRequestedAt(long stopRequestedAt) { 1116 this.stopRequestedAt = stopRequestedAt; 1117 } 1118 1119 boolean isStopRequested() { 1120 return EnvironmentEdgeManager.currentTime() > stopRequestedAt; 1121 } 1122 1123 Deque<BalancerRegionLoad>[] getRegionLoads() { 1124 return regionLoads; 1125 } 1126 1127 @Override 1128 public String toString() { 1129 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1130 for (ServerName sn : servers) { 1131 desc.append(sn.getAddress().toString()).append(", "); 1132 } 1133 desc.append("], serverIndicesSortedByRegionCount=") 1134 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 1135 .append(Arrays.deepToString(regionsPerServer)); 1136 1137 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1138 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 1139 .append('}'); 1140 return desc.toString(); 1141 } 1142}