001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Supplier; 032import org.agrona.collections.Hashing; 033import org.agrona.collections.Int2IntCounterMap; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionReplicaUtil; 038import org.apache.hadoop.hbase.master.RackManager; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 047 048/** 049 * An efficient array based implementation similar to ClusterState for keeping the status of the 050 * cluster in terms of region assignment and distribution. LoadBalancers, such as 051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 052 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 053 * <p/> 054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 055 * topology in terms of server names, hostnames and racks. 056 */ 057@InterfaceAudience.Private 058class BalancerClusterState { 059 060 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 061 062 ServerName[] servers; 063 // ServerName uniquely identifies a region server. multiple RS can run on the same host 064 String[] hosts; 065 String[] racks; 066 boolean multiServersPerHost = false; // whether or not any host has more than one server 067 068 ArrayList<String> tables; 069 RegionInfo[] regions; 070 Deque<BalancerRegionLoad>[] regionLoads; 071 private RegionHDFSBlockLocationFinder regionFinder; 072 073 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 074 075 int[] serverIndexToHostIndex; // serverIndex -> host index 076 int[] serverIndexToRackIndex; // serverIndex -> rack index 077 078 int[][] regionsPerServer; // serverIndex -> region list 079 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 080 int[][] regionsPerHost; // hostIndex -> list of regions 081 int[][] regionsPerRack; // rackIndex -> region list 082 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 083 // replicas by primary region index 084 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 085 // primary region index 086 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 087 // primary region index 088 089 int[][] serversPerHost; // hostIndex -> list of server indexes 090 int[][] serversPerRack; // rackIndex -> list of server indexes 091 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 092 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 093 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 094 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 095 int[] numRegionsPerTable; // tableIndex -> region count 096 double[] meanRegionsPerTable; // mean region count per table 097 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 098 boolean hasRegionReplicas = false; // whether there is regions with replicas 099 100 Integer[] serverIndicesSortedByRegionCount; 101 Integer[] serverIndicesSortedByLocality; 102 103 Map<Address, Integer> serversToIndex; 104 Map<String, Integer> hostsToIndex; 105 Map<String, Integer> racksToIndex; 106 Map<String, Integer> tablesToIndex; 107 Map<RegionInfo, Integer> regionsToIndex; 108 float[] localityPerServer; 109 110 int numServers; 111 int numHosts; 112 int numRacks; 113 int numTables; 114 int numRegions; 115 int maxReplicas = 1; 116 117 int numMovedRegions = 0; // num moved regions from the initial configuration 118 Map<ServerName, List<RegionInfo>> clusterState; 119 120 private final RackManager rackManager; 121 // Maps region -> rackIndex -> locality of region on rack 122 private float[][] rackLocalities; 123 // Maps localityType -> region -> [server|rack]Index with highest locality 124 private int[][] regionsToMostLocalEntities; 125 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 126 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 127 // Maps regionIndex -> serverIndex with best region cache ratio 128 private int[] regionServerIndexWithBestRegionCachedRatio; 129 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 130 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 131 132 private final Supplier<List<Integer>> shuffledServerIndicesSupplier = 133 Suppliers.memoizeWithExpiration(() -> { 134 Collection<Integer> serverIndices = serversToIndex.values(); 135 List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); 136 Collections.shuffle(shuffledServerIndices); 137 return shuffledServerIndices; 138 }, 5, TimeUnit.SECONDS); 139 private long stopRequestedAt = Long.MAX_VALUE; 140 141 static class DefaultRackManager extends RackManager { 142 @Override 143 public String getRack(ServerName server) { 144 return UNKNOWN_RACK; 145 } 146 } 147 148 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 149 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 150 RackManager rackManager) { 151 this(null, clusterState, loads, regionFinder, rackManager, null); 152 } 153 154 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 155 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 156 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 157 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); 158 } 159 160 @SuppressWarnings("unchecked") 161 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 162 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 163 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, 164 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 165 if (unassignedRegions == null) { 166 unassignedRegions = Collections.emptyList(); 167 } 168 169 serversToIndex = new HashMap<>(); 170 hostsToIndex = new HashMap<>(); 171 racksToIndex = new HashMap<>(); 172 tablesToIndex = new HashMap<>(); 173 174 // TODO: We should get the list of tables from master 175 tables = new ArrayList<>(); 176 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 177 178 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 179 180 numRegions = 0; 181 182 List<List<Integer>> serversPerHostList = new ArrayList<>(); 183 List<List<Integer>> serversPerRackList = new ArrayList<>(); 184 this.clusterState = clusterState; 185 this.regionFinder = regionFinder; 186 187 // Use servername and port as there can be dead servers in this list. We want everything with 188 // a matching hostname and port to have the same index. 189 for (ServerName sn : clusterState.keySet()) { 190 if (sn == null) { 191 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 192 + "skipping; unassigned regions?"); 193 if (LOG.isTraceEnabled()) { 194 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 195 } 196 continue; 197 } 198 if (serversToIndex.get(sn.getAddress()) == null) { 199 serversToIndex.put(sn.getAddress(), numServers++); 200 } 201 if (!hostsToIndex.containsKey(sn.getHostname())) { 202 hostsToIndex.put(sn.getHostname(), numHosts++); 203 serversPerHostList.add(new ArrayList<>(1)); 204 } 205 206 int serverIndex = serversToIndex.get(sn.getAddress()); 207 int hostIndex = hostsToIndex.get(sn.getHostname()); 208 serversPerHostList.get(hostIndex).add(serverIndex); 209 210 String rack = this.rackManager.getRack(sn); 211 212 if (!racksToIndex.containsKey(rack)) { 213 racksToIndex.put(rack, numRacks++); 214 serversPerRackList.add(new ArrayList<>()); 215 } 216 int rackIndex = racksToIndex.get(rack); 217 serversPerRackList.get(rackIndex).add(serverIndex); 218 } 219 220 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 221 // Count how many regions there are. 222 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 223 numRegions += entry.getValue().size(); 224 } 225 numRegions += unassignedRegions.size(); 226 227 regionsToIndex = new HashMap<>(numRegions); 228 servers = new ServerName[numServers]; 229 serversPerHost = new int[numHosts][]; 230 serversPerRack = new int[numRacks][]; 231 regions = new RegionInfo[numRegions]; 232 regionIndexToServerIndex = new int[numRegions]; 233 initialRegionIndexToServerIndex = new int[numRegions]; 234 regionIndexToTableIndex = new int[numRegions]; 235 regionIndexToPrimaryIndex = new int[numRegions]; 236 regionLoads = new Deque[numRegions]; 237 238 regionLocations = new int[numRegions][]; 239 serverIndicesSortedByRegionCount = new Integer[numServers]; 240 serverIndicesSortedByLocality = new Integer[numServers]; 241 localityPerServer = new float[numServers]; 242 243 serverIndexToHostIndex = new int[numServers]; 244 serverIndexToRackIndex = new int[numServers]; 245 regionsPerServer = new int[numServers][]; 246 serverIndexToRegionsOffset = new int[numServers]; 247 regionsPerHost = new int[numHosts][]; 248 regionsPerRack = new int[numRacks][]; 249 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 250 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 251 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 252 253 int regionIndex = 0, regionPerServerIndex = 0; 254 255 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 256 if (entry.getKey() == null) { 257 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 258 continue; 259 } 260 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 261 262 // keep the servername if this is the first server name for this hostname 263 // or this servername has the newest startcode. 264 if ( 265 servers[serverIndex] == null 266 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 267 ) { 268 servers[serverIndex] = entry.getKey(); 269 } 270 271 if (regionsPerServer[serverIndex] != null) { 272 // there is another server with the same hostAndPort in ClusterState. 273 // allocate the array for the total size 274 regionsPerServer[serverIndex] = 275 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 276 } else { 277 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 278 } 279 colocatedReplicaCountsPerServer[serverIndex] = 280 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 281 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 282 serverIndicesSortedByLocality[serverIndex] = serverIndex; 283 } 284 285 hosts = new String[numHosts]; 286 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 287 hosts[entry.getValue()] = entry.getKey(); 288 } 289 racks = new String[numRacks]; 290 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 291 racks[entry.getValue()] = entry.getKey(); 292 } 293 294 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 295 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 296 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 297 298 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 299 serverIndexToHostIndex[serverIndex] = hostIndex; 300 301 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 302 serverIndexToRackIndex[serverIndex] = rackIndex; 303 304 for (RegionInfo region : entry.getValue()) { 305 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 306 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 307 regionIndex++; 308 } 309 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 310 } 311 312 for (RegionInfo region : unassignedRegions) { 313 registerRegion(region, regionIndex, -1, loads, regionFinder); 314 regionIndex++; 315 } 316 317 if (LOG.isTraceEnabled()) { 318 for (int i = 0; i < numServers; i++) { 319 LOG.trace("server {} has {} regions", i, regionsPerServer[i].length); 320 } 321 } 322 for (int i = 0; i < serversPerHostList.size(); i++) { 323 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 324 for (int j = 0; j < serversPerHost[i].length; j++) { 325 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 326 LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i); 327 } 328 if (serversPerHost[i].length > 1) { 329 multiServersPerHost = true; 330 } 331 } 332 333 for (int i = 0; i < serversPerRackList.size(); i++) { 334 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 335 for (int j = 0; j < serversPerRack[i].length; j++) { 336 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 337 LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 338 } 339 } 340 341 numTables = tables.size(); 342 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 343 numRacks); 344 numRegionsPerServerPerTable = new int[numTables][numServers]; 345 numRegionsPerTable = new int[numTables]; 346 347 for (int i = 0; i < numTables; i++) { 348 for (int j = 0; j < numServers; j++) { 349 numRegionsPerServerPerTable[i][j] = 0; 350 } 351 } 352 353 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 354 if (regionIndexToServerIndex[i] >= 0) { 355 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 356 numRegionsPerTable[regionIndexToTableIndex[i]]++; 357 } 358 } 359 360 // Avoid repeated computation for planning 361 meanRegionsPerTable = new double[numTables]; 362 363 for (int i = 0; i < numTables; i++) { 364 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 365 } 366 367 for (int i = 0; i < regions.length; i++) { 368 RegionInfo info = regions[i]; 369 if (RegionReplicaUtil.isDefaultReplica(info)) { 370 regionIndexToPrimaryIndex[i] = i; 371 } else { 372 hasRegionReplicas = true; 373 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 374 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 375 } 376 } 377 378 for (int i = 0; i < regionsPerServer.length; i++) { 379 colocatedReplicaCountsPerServer[i] = 380 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 381 for (int j = 0; j < regionsPerServer[i].length; j++) { 382 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 383 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 384 } 385 } 386 // compute regionsPerHost 387 if (multiServersPerHost) { 388 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 389 serversPerHost); 390 } 391 392 // compute regionsPerRack 393 if (numRacks > 1) { 394 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 395 serversPerRack); 396 } 397 } 398 399 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 400 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 401 for (int i = 0; i < serversPerLocation.length; i++) { 402 int numRegionsPerLocation = 0; 403 for (int j = 0; j < serversPerLocation[i].length; j++) { 404 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 405 } 406 regionsPerLocation[i] = new int[numRegionsPerLocation]; 407 colocatedReplicaCountsPerLocation[i] = 408 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 409 } 410 411 for (int i = 0; i < serversPerLocation.length; i++) { 412 int numRegionPerLocationIndex = 0; 413 for (int j = 0; j < serversPerLocation[i].length; j++) { 414 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 415 int region = regionsPerServer[serversPerLocation[i][j]][k]; 416 regionsPerLocation[i][numRegionPerLocationIndex] = region; 417 int primaryIndex = regionIndexToPrimaryIndex[region]; 418 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 419 numRegionPerLocationIndex++; 420 } 421 } 422 } 423 424 } 425 426 /** Helper for Cluster constructor to handle a region */ 427 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 428 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 429 String tableName = region.getTable().getNameAsString(); 430 if (!tablesToIndex.containsKey(tableName)) { 431 tables.add(tableName); 432 tablesToIndex.put(tableName, tablesToIndex.size()); 433 } 434 int tableIndex = tablesToIndex.get(tableName); 435 436 regionsToIndex.put(region, regionIndex); 437 regions[regionIndex] = region; 438 regionIndexToServerIndex[regionIndex] = serverIndex; 439 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 440 regionIndexToTableIndex[regionIndex] = tableIndex; 441 442 // region load 443 if (loads != null) { 444 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 445 // That could have failed if the RegionLoad is using the other regionName 446 if (rl == null) { 447 // Try getting the region load using encoded name. 448 rl = loads.get(region.getEncodedName()); 449 } 450 regionLoads[regionIndex] = rl; 451 } 452 453 if (regionFinder != null) { 454 // region location 455 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 456 regionLocations[regionIndex] = new int[loc.size()]; 457 for (int i = 0; i < loc.size(); i++) { 458 regionLocations[regionIndex][i] = loc.get(i) == null 459 ? -1 460 : (serversToIndex.get(loc.get(i).getAddress()) == null 461 ? -1 462 : serversToIndex.get(loc.get(i).getAddress())); 463 } 464 } 465 466 int numReplicas = region.getReplicaId() + 1; 467 if (numReplicas > maxReplicas) { 468 maxReplicas = numReplicas; 469 } 470 } 471 472 /** 473 * Returns true iff a given server has less regions than the balanced amount 474 */ 475 public boolean serverHasTooFewRegions(int server) { 476 int minLoad = this.numRegions / numServers; 477 int numRegions = getNumRegions(server); 478 return numRegions < minLoad; 479 } 480 481 /** 482 * Retrieves and lazily initializes a field storing the locality of every region/server 483 * combination 484 */ 485 public float[][] getOrComputeRackLocalities() { 486 if (rackLocalities == null || regionsToMostLocalEntities == null) { 487 computeCachedLocalities(); 488 } 489 return rackLocalities; 490 } 491 492 /** 493 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 494 * the locality 495 */ 496 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 497 if (rackLocalities == null || regionsToMostLocalEntities == null) { 498 computeCachedLocalities(); 499 } 500 return regionsToMostLocalEntities[type.ordinal()]; 501 } 502 503 /** 504 * Looks up locality from cache of localities. Will create cache if it does not already exist. 505 */ 506 public float getOrComputeLocality(int region, int entity, 507 BalancerClusterState.LocalityType type) { 508 switch (type) { 509 case SERVER: 510 return getLocalityOfRegion(region, entity); 511 case RACK: 512 return getOrComputeRackLocalities()[region][entity]; 513 default: 514 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 515 } 516 } 517 518 /** 519 * Returns locality weighted by region size in MB. Will create locality cache if it does not 520 * already exist. 521 */ 522 public double getOrComputeWeightedLocality(int region, int server, 523 BalancerClusterState.LocalityType type) { 524 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 525 } 526 527 /** 528 * Returns the size in MB from the most recent RegionLoad for region 529 */ 530 public int getRegionSizeMB(int region) { 531 Deque<BalancerRegionLoad> load = regionLoads[region]; 532 // This means regions have no actual data on disk 533 if (load == null) { 534 return 0; 535 } 536 return regionLoads[region].getLast().getStorefileSizeMB(); 537 } 538 539 /** 540 * Computes and caches the locality for each region/rack combinations, as well as storing a 541 * mapping of region -> server and region -> rack such that server and rack have the highest 542 * locality for region 543 */ 544 private void computeCachedLocalities() { 545 rackLocalities = new float[numRegions][numRacks]; 546 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 547 548 // Compute localities and find most local server per region 549 for (int region = 0; region < numRegions; region++) { 550 int serverWithBestLocality = 0; 551 float bestLocalityForRegion = 0; 552 for (int server = 0; server < numServers; server++) { 553 // Aggregate per-rack locality 554 float locality = getLocalityOfRegion(region, server); 555 int rack = serverIndexToRackIndex[server]; 556 int numServersInRack = serversPerRack[rack].length; 557 rackLocalities[region][rack] += locality / numServersInRack; 558 559 if (locality > bestLocalityForRegion) { 560 serverWithBestLocality = server; 561 bestLocalityForRegion = locality; 562 } 563 } 564 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 565 566 // Find most local rack per region 567 int rackWithBestLocality = 0; 568 float bestRackLocalityForRegion = 0.0f; 569 for (int rack = 0; rack < numRacks; rack++) { 570 float rackLocality = rackLocalities[region][rack]; 571 if (rackLocality > bestRackLocalityForRegion) { 572 bestRackLocalityForRegion = rackLocality; 573 rackWithBestLocality = rack; 574 } 575 } 576 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 577 } 578 579 } 580 581 /** 582 * Returns the size of hFiles from the most recent RegionLoad for region 583 */ 584 public int getTotalRegionHFileSizeMB(int region) { 585 Deque<BalancerRegionLoad> load = regionLoads[region]; 586 if (load == null) { 587 // This means, that the region has no actual data on disk 588 return 0; 589 } 590 return regionLoads[region].getLast().getRegionSizeMB(); 591 } 592 593 /** 594 * Returns the weighted cache ratio of a region on the given region server 595 */ 596 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 597 return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); 598 } 599 600 /** 601 * Returns the amount by which a region is cached on a given region server. If the region is not 602 * currently hosted on the given region server, then find out if it was previously hosted there 603 * and return the old cache ratio. 604 */ 605 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 606 float regionCacheRatio = 0.0f; 607 608 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 609 for (int regionIndex : regionsPerServer[regionServerIndex]) { 610 if (region != regionIndex) { 611 continue; 612 } 613 614 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 615 616 // The region is currently hosted on this region server. Get the region cache ratio for this 617 // region on this server 618 regionCacheRatio = 619 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 620 621 return regionCacheRatio; 622 } 623 624 // Region is not currently hosted on this server. Check if the region was cached on this 625 // server earlier. This can happen when the server was shutdown and the cache was persisted. 626 // Search using the region name and server name and not the index id and server id as these ids 627 // may change when a server is marked as dead or a new server is added. 628 String regionEncodedName = regions[region].getEncodedName(); 629 ServerName serverName = servers[regionServerIndex]; 630 if ( 631 regionCacheRatioOnOldServerMap != null 632 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 633 ) { 634 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 635 regionCacheRatioOnOldServerMap.get(regionEncodedName); 636 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 637 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 638 if (LOG.isDebugEnabled()) { 639 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 640 serverName, regionCacheRatio); 641 } 642 } 643 } 644 return regionCacheRatio; 645 } 646 647 /** 648 * Populate the maps containing information about how much a region is cached on a region server. 649 */ 650 private void computeRegionServerRegionCacheRatio() { 651 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 652 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 653 654 for (int region = 0; region < numRegions; region++) { 655 float bestRegionCacheRatio = 0.0f; 656 int serverWithBestRegionCacheRatio = 0; 657 for (int server = 0; server < numServers; server++) { 658 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 659 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 660 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 661 // cache ratio only if the cache ratio is greater than 0. 662 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 663 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 664 } 665 if (regionCacheRatio > bestRegionCacheRatio) { 666 serverWithBestRegionCacheRatio = server; 667 // If the server currently hosting the region has equal cache ratio to a historical 668 // server, consider the current server to keep hosting the region 669 bestRegionCacheRatio = regionCacheRatio; 670 } else if ( 671 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 672 ) { 673 // If two servers have same region cache ratio, then the server currently hosting the 674 // region 675 // should retain the region 676 serverWithBestRegionCacheRatio = server; 677 } 678 } 679 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 680 Pair<Integer, Integer> regionServerPair = 681 new Pair<>(region, regionIndexToServerIndex[region]); 682 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 683 if (tempRegionCacheRatio > bestRegionCacheRatio) { 684 LOG.warn( 685 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 686 + "best region cache ratio {} on server {}", 687 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 688 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 689 } 690 } 691 } 692 693 protected float getOrComputeRegionCacheRatio(int region, int server) { 694 if ( 695 regionServerIndexWithBestRegionCachedRatio == null 696 || regionIndexServerIndexRegionCachedRatio.isEmpty() 697 ) { 698 computeRegionServerRegionCacheRatio(); 699 } 700 701 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 702 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 703 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 704 : 0.0f; 705 } 706 707 public int[] getOrComputeServerWithBestRegionCachedRatio() { 708 if ( 709 regionServerIndexWithBestRegionCachedRatio == null 710 || regionIndexServerIndexRegionCachedRatio.isEmpty() 711 ) { 712 computeRegionServerRegionCacheRatio(); 713 } 714 return regionServerIndexWithBestRegionCachedRatio; 715 } 716 717 /** 718 * Maps region index to rack index 719 */ 720 public int getRackForRegion(int region) { 721 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 722 } 723 724 enum LocalityType { 725 SERVER, 726 RACK 727 } 728 729 public void doAction(BalanceAction action) { 730 switch (action.getType()) { 731 case NULL: 732 break; 733 case ASSIGN_REGION: 734 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 735 assert action instanceof AssignRegionAction : action.getClass(); 736 AssignRegionAction ar = (AssignRegionAction) action; 737 regionsPerServer[ar.getServer()] = 738 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 739 regionMoved(ar.getRegion(), -1, ar.getServer()); 740 break; 741 case MOVE_REGION: 742 assert action instanceof MoveRegionAction : action.getClass(); 743 MoveRegionAction mra = (MoveRegionAction) action; 744 regionsPerServer[mra.getFromServer()] = 745 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 746 regionsPerServer[mra.getToServer()] = 747 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 748 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 749 break; 750 case SWAP_REGIONS: 751 assert action instanceof SwapRegionsAction : action.getClass(); 752 SwapRegionsAction a = (SwapRegionsAction) action; 753 regionsPerServer[a.getFromServer()] = 754 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 755 regionsPerServer[a.getToServer()] = 756 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 757 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 758 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 759 break; 760 case MOVE_BATCH: 761 assert action instanceof MoveBatchAction : action.getClass(); 762 MoveBatchAction mba = (MoveBatchAction) action; 763 for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { 764 Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); 765 regionsPerServer[serverIndex] = 766 removeRegions(regionsPerServer[serverIndex], regionsToRemove); 767 } 768 for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { 769 Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); 770 regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); 771 } 772 for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { 773 regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), 774 moveRegionAction.getToServer()); 775 } 776 break; 777 default: 778 throw new RuntimeException("Unknown action:" + action.getType()); 779 } 780 } 781 782 /** 783 * Return true if the placement of region on server would lower the availability of the region in 784 * question 785 * @return true or false 786 */ 787 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 788 if (!serversToIndex.containsKey(serverName.getAddress())) { 789 return false; // safeguard against race between cluster.servers and servers from LB method 790 // args 791 } 792 int server = serversToIndex.get(serverName.getAddress()); 793 int region = regionsToIndex.get(regionInfo); 794 795 // Region replicas for same region should better assign to different servers 796 for (int i : regionsPerServer[server]) { 797 RegionInfo otherRegionInfo = regions[i]; 798 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 799 return true; 800 } 801 } 802 803 int primary = regionIndexToPrimaryIndex[region]; 804 if (primary == -1) { 805 return false; 806 } 807 // there is a subset relation for server < host < rack 808 // check server first 809 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 810 if (result != 0) { 811 return result > 0; 812 } 813 814 // check host 815 if (multiServersPerHost) { 816 result = checkLocationForPrimary(serverIndexToHostIndex[server], 817 colocatedReplicaCountsPerHost, primary); 818 if (result != 0) { 819 return result > 0; 820 } 821 } 822 823 // check rack 824 if (numRacks > 1) { 825 result = checkLocationForPrimary(serverIndexToRackIndex[server], 826 colocatedReplicaCountsPerRack, primary); 827 if (result != 0) { 828 return result > 0; 829 } 830 } 831 return false; 832 } 833 834 /** 835 * Common method for better solution check. 836 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 837 * colocatedReplicaCountsPerRack 838 * @return 1 for better, -1 for no better, 0 for unknown 839 */ 840 private int checkLocationForPrimary(int location, 841 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 842 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 843 // check for whether there are other Locations that we can place this region 844 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 845 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 846 return 1; // meaning there is a better Location 847 } 848 } 849 return -1; // there is not a better Location to place this 850 } 851 return 0; 852 } 853 854 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 855 if (!serversToIndex.containsKey(serverName.getAddress())) { 856 return; 857 } 858 int server = serversToIndex.get(serverName.getAddress()); 859 int region = regionsToIndex.get(regionInfo); 860 doAction(new AssignRegionAction(region, server)); 861 } 862 863 void regionMoved(int region, int oldServer, int newServer) { 864 regionIndexToServerIndex[region] = newServer; 865 if (initialRegionIndexToServerIndex[region] == newServer) { 866 numMovedRegions--; // region moved back to original location 867 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 868 numMovedRegions++; // region moved from original location 869 } 870 int tableIndex = regionIndexToTableIndex[region]; 871 if (oldServer >= 0) { 872 numRegionsPerServerPerTable[tableIndex][oldServer]--; 873 } 874 numRegionsPerServerPerTable[tableIndex][newServer]++; 875 876 // update for servers 877 int primary = regionIndexToPrimaryIndex[region]; 878 if (oldServer >= 0) { 879 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 880 } 881 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 882 883 // update for hosts 884 if (multiServersPerHost) { 885 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 886 oldServer, newServer, primary, region); 887 } 888 889 // update for racks 890 if (numRacks > 1) { 891 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 892 oldServer, newServer, primary, region); 893 } 894 } 895 896 /** 897 * Common method for per host and per Location region index updates when a region is moved. 898 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 899 * @param regionsPerLocation regionsPerHost or regionsPerLocation 900 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 901 * colocatedReplicaCountsPerRack 902 */ 903 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 904 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 905 int primary, int region) { 906 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 907 int newLocation = serverIndexToLocation[newServer]; 908 if (newLocation != oldLocation) { 909 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 910 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 911 if (oldLocation >= 0) { 912 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 913 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 914 } 915 } 916 917 } 918 919 int[] removeRegion(int[] regions, int regionIndex) { 920 // TODO: this maybe costly. Consider using linked lists 921 int[] newRegions = new int[regions.length - 1]; 922 int i = 0; 923 for (i = 0; i < regions.length; i++) { 924 if (regions[i] == regionIndex) { 925 break; 926 } 927 newRegions[i] = regions[i]; 928 } 929 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 930 return newRegions; 931 } 932 933 int[] addRegion(int[] regions, int regionIndex) { 934 int[] newRegions = new int[regions.length + 1]; 935 System.arraycopy(regions, 0, newRegions, 0, regions.length); 936 newRegions[newRegions.length - 1] = regionIndex; 937 return newRegions; 938 } 939 940 int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { 941 // Calculate the size of the new regions array 942 int newSize = regions.length - regionIndicesToRemove.size(); 943 if (newSize < 0) { 944 throw new IllegalStateException( 945 "Region indices mismatch: more regions to remove than in the regions array"); 946 } 947 948 int[] newRegions = new int[newSize]; 949 int newIndex = 0; 950 951 // Copy only the regions not in the removal set 952 for (int region : regions) { 953 if (!regionIndicesToRemove.contains(region)) { 954 newRegions[newIndex++] = region; 955 } 956 } 957 958 // If the newIndex is smaller than newSize, some regions were missing from the input array 959 if (newIndex != newSize) { 960 throw new IllegalStateException("Region indices mismatch: some regions in the removal " 961 + "set were not found in the regions array"); 962 } 963 964 return newRegions; 965 } 966 967 int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { 968 int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; 969 970 // Copy the existing regions to the new array 971 System.arraycopy(regions, 0, newRegions, 0, regions.length); 972 973 // Add the new regions at the end of the array 974 int newIndex = regions.length; 975 for (int regionIndex : regionIndicesToAdd) { 976 newRegions[newIndex++] = regionIndex; 977 } 978 979 return newRegions; 980 } 981 982 List<Integer> getShuffledServerIndices() { 983 return shuffledServerIndicesSupplier.get(); 984 } 985 986 int[] addRegionSorted(int[] regions, int regionIndex) { 987 int[] newRegions = new int[regions.length + 1]; 988 int i = 0; 989 for (i = 0; i < regions.length; i++) { // find the index to insert 990 if (regions[i] > regionIndex) { 991 break; 992 } 993 } 994 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 995 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 996 newRegions[i] = regionIndex; 997 998 return newRegions; 999 } 1000 1001 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 1002 int i = 0; 1003 for (i = 0; i < regions.length; i++) { 1004 if (regions[i] == regionIndex) { 1005 regions[i] = newRegionIndex; 1006 break; 1007 } 1008 } 1009 return regions; 1010 } 1011 1012 void sortServersByRegionCount() { 1013 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 1014 } 1015 1016 int getNumRegions(int server) { 1017 return regionsPerServer[server].length; 1018 } 1019 1020 boolean contains(int[] arr, int val) { 1021 return Arrays.binarySearch(arr, val) >= 0; 1022 } 1023 1024 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 1025 1026 public Comparator<Integer> getNumRegionsComparator() { 1027 return numRegionsComparator; 1028 } 1029 1030 int getLowestLocalityRegionOnServer(int serverIndex) { 1031 if (regionFinder != null) { 1032 float lowestLocality = 1.0f; 1033 int lowestLocalityRegionIndex = -1; 1034 if (regionsPerServer[serverIndex].length == 0) { 1035 // No regions on that region server 1036 return -1; 1037 } 1038 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 1039 int regionIndex = regionsPerServer[serverIndex][j]; 1040 HDFSBlocksDistribution distribution = 1041 regionFinder.getBlockDistribution(regions[regionIndex]); 1042 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 1043 // skip empty region 1044 if (distribution.getUniqueBlocksTotalWeight() == 0) { 1045 continue; 1046 } 1047 if (locality < lowestLocality) { 1048 lowestLocality = locality; 1049 lowestLocalityRegionIndex = j; 1050 } 1051 } 1052 if (lowestLocalityRegionIndex == -1) { 1053 return -1; 1054 } 1055 if (LOG.isTraceEnabled()) { 1056 LOG.trace("Lowest locality region is " 1057 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 1058 .getRegionNameAsString() 1059 + " with locality " + lowestLocality + " and its region server contains " 1060 + regionsPerServer[serverIndex].length + " regions"); 1061 } 1062 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 1063 } else { 1064 return -1; 1065 } 1066 } 1067 1068 float getLocalityOfRegion(int region, int server) { 1069 if (regionFinder != null) { 1070 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 1071 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 1072 } else { 1073 return 0f; 1074 } 1075 } 1076 1077 void setNumRegions(int numRegions) { 1078 this.numRegions = numRegions; 1079 } 1080 1081 void setNumMovedRegions(int numMovedRegions) { 1082 this.numMovedRegions = numMovedRegions; 1083 } 1084 1085 public int getMaxReplicas() { 1086 return maxReplicas; 1087 } 1088 1089 void setStopRequestedAt(long stopRequestedAt) { 1090 this.stopRequestedAt = stopRequestedAt; 1091 } 1092 1093 boolean isStopRequested() { 1094 return EnvironmentEdgeManager.currentTime() > stopRequestedAt; 1095 } 1096 1097 @Override 1098 public String toString() { 1099 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1100 for (ServerName sn : servers) { 1101 desc.append(sn.getAddress().toString()).append(", "); 1102 } 1103 desc.append("], serverIndicesSortedByRegionCount=") 1104 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 1105 .append(Arrays.deepToString(regionsPerServer)); 1106 1107 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1108 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 1109 .append('}'); 1110 return desc.toString(); 1111 } 1112}