001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Supplier; 032import org.agrona.collections.Hashing; 033import org.agrona.collections.Int2IntCounterMap; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionReplicaUtil; 038import org.apache.hadoop.hbase.master.RackManager; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 047 048/** 049 * An efficient array based implementation similar to ClusterState for keeping the status of the 050 * cluster in terms of region assignment and distribution. LoadBalancers, such as 051 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 052 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 053 * <p/> 054 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 055 * topology in terms of server names, hostnames and racks. 056 */ 057@InterfaceAudience.Private 058class BalancerClusterState { 059 060 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 061 062 ServerName[] servers; 063 // ServerName uniquely identifies a region server. multiple RS can run on the same host 064 String[] hosts; 065 String[] racks; 066 boolean multiServersPerHost = false; // whether or not any host has more than one server 067 068 ArrayList<String> tables; 069 RegionInfo[] regions; 070 Deque<BalancerRegionLoad>[] regionLoads; 071 private RegionHDFSBlockLocationFinder regionFinder; 072 073 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 074 075 int[] serverIndexToHostIndex; // serverIndex -> host index 076 int[] serverIndexToRackIndex; // serverIndex -> rack index 077 078 int[][] regionsPerServer; // serverIndex -> region list 079 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 080 int[][] regionsPerHost; // hostIndex -> list of regions 081 int[][] regionsPerRack; // rackIndex -> region list 082 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 083 // replicas by primary region index 084 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 085 // primary region index 086 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 087 // primary region index 088 089 int[][] serversPerHost; // hostIndex -> list of server indexes 090 int[][] serversPerRack; // rackIndex -> list of server indexes 091 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 092 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 093 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 094 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 095 int[] numRegionsPerTable; // tableIndex -> region count 096 double[] meanRegionsPerTable; // mean region count per table 097 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 098 boolean hasRegionReplicas = false; // whether there is regions with replicas 099 100 Integer[] serverIndicesSortedByRegionCount; 101 Integer[] serverIndicesSortedByLocality; 102 103 Map<Address, Integer> serversToIndex; 104 Map<String, Integer> hostsToIndex; 105 Map<String, Integer> racksToIndex; 106 Map<String, Integer> tablesToIndex; 107 Map<RegionInfo, Integer> regionsToIndex; 108 float[] localityPerServer; 109 110 int numServers; 111 int numHosts; 112 int numRacks; 113 int numTables; 114 int numRegions; 115 int maxReplicas = 1; 116 117 int numMovedRegions = 0; // num moved regions from the initial configuration 118 Map<ServerName, List<RegionInfo>> clusterState; 119 120 private final RackManager rackManager; 121 // Maps region -> rackIndex -> locality of region on rack 122 private float[][] rackLocalities; 123 // Maps localityType -> region -> [server|rack]Index with highest locality 124 private int[][] regionsToMostLocalEntities; 125 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 126 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 127 // Maps regionIndex -> serverIndex with best region cache ratio 128 private int[] regionServerIndexWithBestRegionCachedRatio; 129 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 130 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 131 // cache free space available on each server, aligned to the "servers" array indices; 132 long[] serverBlockCacheFreeSize; 133 134 private final Supplier<List<Integer>> shuffledServerIndicesSupplier = 135 Suppliers.memoizeWithExpiration(() -> { 136 Collection<Integer> serverIndices = serversToIndex.values(); 137 List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); 138 Collections.shuffle(shuffledServerIndices); 139 return shuffledServerIndices; 140 }, 5, TimeUnit.SECONDS); 141 private long stopRequestedAt = Long.MAX_VALUE; 142 143 static class DefaultRackManager extends RackManager { 144 @Override 145 public String getRack(ServerName server) { 146 return UNKNOWN_RACK; 147 } 148 } 149 150 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 151 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 152 RackManager rackManager) { 153 this(null, clusterState, loads, regionFinder, rackManager, null, null); 154 } 155 156 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 157 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 158 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio, 159 Map<ServerName, Long> serverBlockCacheFreeByServer) { 160 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio, 161 serverBlockCacheFreeByServer); 162 } 163 164 @SuppressWarnings("unchecked") 165 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 166 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 167 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, 168 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio, 169 Map<ServerName, Long> serverBlockCacheFreeByServer) { 170 if (unassignedRegions == null) { 171 unassignedRegions = Collections.emptyList(); 172 } 173 174 serversToIndex = new HashMap<>(); 175 hostsToIndex = new HashMap<>(); 176 racksToIndex = new HashMap<>(); 177 tablesToIndex = new HashMap<>(); 178 179 // TODO: We should get the list of tables from master 180 tables = new ArrayList<>(); 181 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 182 183 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 184 185 numRegions = 0; 186 187 List<List<Integer>> serversPerHostList = new ArrayList<>(); 188 List<List<Integer>> serversPerRackList = new ArrayList<>(); 189 this.clusterState = clusterState; 190 this.regionFinder = regionFinder; 191 192 // Use servername and port as there can be dead servers in this list. We want everything with 193 // a matching hostname and port to have the same index. 194 for (ServerName sn : clusterState.keySet()) { 195 if (sn == null) { 196 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 197 + "skipping; unassigned regions?"); 198 if (LOG.isTraceEnabled()) { 199 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 200 } 201 continue; 202 } 203 if (serversToIndex.get(sn.getAddress()) == null) { 204 serversToIndex.put(sn.getAddress(), numServers++); 205 } 206 if (!hostsToIndex.containsKey(sn.getHostname())) { 207 hostsToIndex.put(sn.getHostname(), numHosts++); 208 serversPerHostList.add(new ArrayList<>(1)); 209 } 210 211 int serverIndex = serversToIndex.get(sn.getAddress()); 212 int hostIndex = hostsToIndex.get(sn.getHostname()); 213 serversPerHostList.get(hostIndex).add(serverIndex); 214 215 String rack = this.rackManager.getRack(sn); 216 217 if (!racksToIndex.containsKey(rack)) { 218 racksToIndex.put(rack, numRacks++); 219 serversPerRackList.add(new ArrayList<>()); 220 } 221 int rackIndex = racksToIndex.get(rack); 222 serversPerRackList.get(rackIndex).add(serverIndex); 223 } 224 225 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 226 // Count how many regions there are. 227 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 228 numRegions += entry.getValue().size(); 229 } 230 numRegions += unassignedRegions.size(); 231 232 regionsToIndex = new HashMap<>(numRegions); 233 servers = new ServerName[numServers]; 234 serversPerHost = new int[numHosts][]; 235 serversPerRack = new int[numRacks][]; 236 regions = new RegionInfo[numRegions]; 237 regionIndexToServerIndex = new int[numRegions]; 238 initialRegionIndexToServerIndex = new int[numRegions]; 239 regionIndexToTableIndex = new int[numRegions]; 240 regionIndexToPrimaryIndex = new int[numRegions]; 241 regionLoads = new Deque[numRegions]; 242 243 regionLocations = new int[numRegions][]; 244 serverIndicesSortedByRegionCount = new Integer[numServers]; 245 serverIndicesSortedByLocality = new Integer[numServers]; 246 localityPerServer = new float[numServers]; 247 248 serverIndexToHostIndex = new int[numServers]; 249 serverIndexToRackIndex = new int[numServers]; 250 regionsPerServer = new int[numServers][]; 251 serverIndexToRegionsOffset = new int[numServers]; 252 regionsPerHost = new int[numHosts][]; 253 regionsPerRack = new int[numRacks][]; 254 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 255 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 256 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 257 258 int regionIndex = 0, regionPerServerIndex = 0; 259 260 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 261 if (entry.getKey() == null) { 262 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 263 continue; 264 } 265 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 266 267 // keep the servername if this is the first server name for this hostname 268 // or this servername has the newest startcode. 269 if ( 270 servers[serverIndex] == null 271 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 272 ) { 273 servers[serverIndex] = entry.getKey(); 274 } 275 276 if (regionsPerServer[serverIndex] != null) { 277 // there is another server with the same hostAndPort in ClusterState. 278 // allocate the array for the total size 279 regionsPerServer[serverIndex] = 280 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 281 } else { 282 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 283 } 284 colocatedReplicaCountsPerServer[serverIndex] = 285 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 286 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 287 serverIndicesSortedByLocality[serverIndex] = serverIndex; 288 } 289 290 hosts = new String[numHosts]; 291 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 292 hosts[entry.getValue()] = entry.getKey(); 293 } 294 racks = new String[numRacks]; 295 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 296 racks[entry.getValue()] = entry.getKey(); 297 } 298 299 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 300 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 301 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 302 303 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 304 serverIndexToHostIndex[serverIndex] = hostIndex; 305 306 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 307 serverIndexToRackIndex[serverIndex] = rackIndex; 308 309 for (RegionInfo region : entry.getValue()) { 310 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 311 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 312 regionIndex++; 313 } 314 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 315 } 316 317 for (RegionInfo region : unassignedRegions) { 318 registerRegion(region, regionIndex, -1, loads, regionFinder); 319 regionIndex++; 320 } 321 322 if (LOG.isTraceEnabled()) { 323 for (int i = 0; i < numServers; i++) { 324 LOG.trace("server {} has {} regions", i, regionsPerServer[i].length); 325 } 326 } 327 for (int i = 0; i < serversPerHostList.size(); i++) { 328 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 329 for (int j = 0; j < serversPerHost[i].length; j++) { 330 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 331 LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i); 332 } 333 if (serversPerHost[i].length > 1) { 334 multiServersPerHost = true; 335 } 336 } 337 338 for (int i = 0; i < serversPerRackList.size(); i++) { 339 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 340 for (int j = 0; j < serversPerRack[i].length; j++) { 341 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 342 LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 343 } 344 } 345 346 numTables = tables.size(); 347 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 348 numRacks); 349 numRegionsPerServerPerTable = new int[numTables][numServers]; 350 numRegionsPerTable = new int[numTables]; 351 352 for (int i = 0; i < numTables; i++) { 353 for (int j = 0; j < numServers; j++) { 354 numRegionsPerServerPerTable[i][j] = 0; 355 } 356 } 357 358 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 359 if (regionIndexToServerIndex[i] >= 0) { 360 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 361 numRegionsPerTable[regionIndexToTableIndex[i]]++; 362 } 363 } 364 365 // Avoid repeated computation for planning 366 meanRegionsPerTable = new double[numTables]; 367 368 for (int i = 0; i < numTables; i++) { 369 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 370 } 371 372 for (int i = 0; i < regions.length; i++) { 373 RegionInfo info = regions[i]; 374 if (RegionReplicaUtil.isDefaultReplica(info)) { 375 regionIndexToPrimaryIndex[i] = i; 376 } else { 377 hasRegionReplicas = true; 378 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 379 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 380 } 381 } 382 383 for (int i = 0; i < regionsPerServer.length; i++) { 384 colocatedReplicaCountsPerServer[i] = 385 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 386 for (int j = 0; j < regionsPerServer[i].length; j++) { 387 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 388 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 389 } 390 } 391 // compute regionsPerHost 392 if (multiServersPerHost) { 393 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 394 serversPerHost); 395 } 396 397 // compute regionsPerRack 398 if (numRacks > 1) { 399 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 400 serversPerRack); 401 } 402 403 this.serverBlockCacheFreeSize = new long[numServers]; 404 if (serverBlockCacheFreeByServer != null) { 405 for (int i = 0; i < numServers; i++) { 406 ServerName sn = servers[i]; 407 this.serverBlockCacheFreeSize[i] = 408 sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L); 409 } 410 } 411 } 412 413 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 414 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 415 for (int i = 0; i < serversPerLocation.length; i++) { 416 int numRegionsPerLocation = 0; 417 for (int j = 0; j < serversPerLocation[i].length; j++) { 418 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 419 } 420 regionsPerLocation[i] = new int[numRegionsPerLocation]; 421 colocatedReplicaCountsPerLocation[i] = 422 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 423 } 424 425 for (int i = 0; i < serversPerLocation.length; i++) { 426 int numRegionPerLocationIndex = 0; 427 for (int j = 0; j < serversPerLocation[i].length; j++) { 428 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 429 int region = regionsPerServer[serversPerLocation[i][j]][k]; 430 regionsPerLocation[i][numRegionPerLocationIndex] = region; 431 int primaryIndex = regionIndexToPrimaryIndex[region]; 432 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 433 numRegionPerLocationIndex++; 434 } 435 } 436 } 437 438 } 439 440 /** Helper for Cluster constructor to handle a region */ 441 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 442 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 443 String tableName = region.getTable().getNameAsString(); 444 if (!tablesToIndex.containsKey(tableName)) { 445 tables.add(tableName); 446 tablesToIndex.put(tableName, tablesToIndex.size()); 447 } 448 int tableIndex = tablesToIndex.get(tableName); 449 450 regionsToIndex.put(region, regionIndex); 451 regions[regionIndex] = region; 452 regionIndexToServerIndex[regionIndex] = serverIndex; 453 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 454 regionIndexToTableIndex[regionIndex] = tableIndex; 455 456 // region load 457 if (loads != null) { 458 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 459 // That could have failed if the RegionLoad is using the other regionName 460 if (rl == null) { 461 // Try getting the region load using encoded name. 462 rl = loads.get(region.getEncodedName()); 463 } 464 regionLoads[regionIndex] = rl; 465 } 466 467 if (regionFinder != null) { 468 // region location 469 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 470 regionLocations[regionIndex] = new int[loc.size()]; 471 for (int i = 0; i < loc.size(); i++) { 472 regionLocations[regionIndex][i] = loc.get(i) == null 473 ? -1 474 : (serversToIndex.get(loc.get(i).getAddress()) == null 475 ? -1 476 : serversToIndex.get(loc.get(i).getAddress())); 477 } 478 } 479 480 int numReplicas = region.getReplicaId() + 1; 481 if (numReplicas > maxReplicas) { 482 maxReplicas = numReplicas; 483 } 484 } 485 486 /** 487 * Returns true iff a given server has less regions than the balanced amount 488 */ 489 public boolean serverHasTooFewRegions(int server) { 490 int minLoad = this.numRegions / numServers; 491 int numRegions = getNumRegions(server); 492 return numRegions < minLoad; 493 } 494 495 /** 496 * Retrieves and lazily initializes a field storing the locality of every region/server 497 * combination 498 */ 499 public float[][] getOrComputeRackLocalities() { 500 if (rackLocalities == null || regionsToMostLocalEntities == null) { 501 computeCachedLocalities(); 502 } 503 return rackLocalities; 504 } 505 506 /** 507 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 508 * the locality 509 */ 510 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 511 if (rackLocalities == null || regionsToMostLocalEntities == null) { 512 computeCachedLocalities(); 513 } 514 return regionsToMostLocalEntities[type.ordinal()]; 515 } 516 517 /** 518 * Looks up locality from cache of localities. Will create cache if it does not already exist. 519 */ 520 public float getOrComputeLocality(int region, int entity, 521 BalancerClusterState.LocalityType type) { 522 switch (type) { 523 case SERVER: 524 return getLocalityOfRegion(region, entity); 525 case RACK: 526 return getOrComputeRackLocalities()[region][entity]; 527 default: 528 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 529 } 530 } 531 532 /** 533 * Returns locality weighted by region size in MB. Will create locality cache if it does not 534 * already exist. 535 */ 536 public double getOrComputeWeightedLocality(int region, int server, 537 BalancerClusterState.LocalityType type) { 538 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 539 } 540 541 /** 542 * Returns the size in MB from the most recent RegionLoad for region 543 */ 544 public int getRegionSizeMB(int region) { 545 Deque<BalancerRegionLoad> load = regionLoads[region]; 546 // This means regions have no actual data on disk 547 if (load == null) { 548 return 0; 549 } 550 return load.getLast().getStorefileSizeMB(); 551 } 552 553 /** 554 * Finds and return the sum of latest reported cache ratio and cold data ratio for the region on 555 * the RegionServer it's currently online. 556 */ 557 float getSumRegionCacheAndColdDataRatio(int region) { 558 Deque<BalancerRegionLoad> dq = regionLoads[region]; 559 if (dq == null || dq.isEmpty()) { 560 return 0.0f; 561 } 562 BalancerRegionLoad load = dq.getLast(); 563 return load.getCurrentRegionCacheRatio() + load.getRegionColdDataRatio(); 564 } 565 566 int getRegionSizeMinusColdDataMB(int region) { 567 Deque<BalancerRegionLoad> dq = regionLoads[region]; 568 if (dq == null || dq.isEmpty()) { 569 return 0; 570 } 571 BalancerRegionLoad load = dq.getLast(); 572 return load.getRegionSizeMB() - (int) (load.getRegionSizeMB() * load.getRegionColdDataRatio()); 573 } 574 575 /** 576 * Computes and caches the locality for each region/rack combinations, as well as storing a 577 * mapping of region -> server and region -> rack such that server and rack have the highest 578 * locality for region 579 */ 580 private void computeCachedLocalities() { 581 rackLocalities = new float[numRegions][numRacks]; 582 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 583 584 // Compute localities and find most local server per region 585 for (int region = 0; region < numRegions; region++) { 586 int serverWithBestLocality = 0; 587 float bestLocalityForRegion = 0; 588 for (int server = 0; server < numServers; server++) { 589 // Aggregate per-rack locality 590 float locality = getLocalityOfRegion(region, server); 591 int rack = serverIndexToRackIndex[server]; 592 int numServersInRack = serversPerRack[rack].length; 593 rackLocalities[region][rack] += locality / numServersInRack; 594 595 if (locality > bestLocalityForRegion) { 596 serverWithBestLocality = server; 597 bestLocalityForRegion = locality; 598 } 599 } 600 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 601 602 // Find most local rack per region 603 int rackWithBestLocality = 0; 604 float bestRackLocalityForRegion = 0.0f; 605 for (int rack = 0; rack < numRacks; rack++) { 606 float rackLocality = rackLocalities[region][rack]; 607 if (rackLocality > bestRackLocalityForRegion) { 608 bestRackLocalityForRegion = rackLocality; 609 rackWithBestLocality = rack; 610 } 611 } 612 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 613 } 614 615 } 616 617 /** 618 * Returns the weighted cache ratio of a region on the given region server 619 */ 620 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 621 return getRegionSizeMinusColdDataMB(region) * getOrComputeRegionCacheRatio(region, server); 622 } 623 624 /** 625 * Returns the amount by which a region is cached on a given region server. If the region is not 626 * currently hosted on the given region server, then find out if it was previously hosted there 627 * and return the old cache ratio. 628 */ 629 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 630 float regionCacheRatio = 0.0f; 631 632 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 633 for (int regionIndex : regionsPerServer[regionServerIndex]) { 634 if (region != regionIndex) { 635 continue; 636 } 637 638 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 639 640 // The region is currently hosted on this region server. Get the region cache ratio for this 641 // region on this server 642 regionCacheRatio = 643 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 644 645 return regionCacheRatio; 646 } 647 648 // Region is not currently hosted on this server. Check if the region was cached on this 649 // server earlier. This can happen when the server was shutdown and the cache was persisted. 650 // Search using the region name and server name and not the index id and server id as these ids 651 // may change when a server is marked as dead or a new server is added. 652 String regionEncodedName = regions[region].getEncodedName(); 653 ServerName serverName = servers[regionServerIndex]; 654 if ( 655 regionCacheRatioOnOldServerMap != null 656 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 657 ) { 658 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 659 regionCacheRatioOnOldServerMap.get(regionEncodedName); 660 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 661 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 662 if (LOG.isDebugEnabled()) { 663 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 664 serverName, regionCacheRatio); 665 } 666 } 667 } 668 return regionCacheRatio; 669 } 670 671 /** 672 * Populate the maps containing information about how much a region is cached on a region server. 673 */ 674 private void computeRegionServerRegionCacheRatio() { 675 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 676 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 677 678 for (int region = 0; region < numRegions; region++) { 679 float bestRegionCacheRatio = 0.0f; 680 int serverWithBestRegionCacheRatio = 0; 681 for (int server = 0; server < numServers; server++) { 682 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 683 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 684 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 685 // cache ratio only if the cache ratio is greater than 0. 686 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 687 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 688 } 689 if (regionCacheRatio > bestRegionCacheRatio) { 690 serverWithBestRegionCacheRatio = server; 691 // If the server currently hosting the region has equal cache ratio to a historical 692 // server, consider the current server to keep hosting the region 693 bestRegionCacheRatio = regionCacheRatio; 694 } else if ( 695 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 696 ) { 697 // If two servers have same region cache ratio, then the server currently hosting the 698 // region 699 // should retain the region 700 serverWithBestRegionCacheRatio = server; 701 } 702 } 703 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 704 Pair<Integer, Integer> regionServerPair = 705 new Pair<>(region, regionIndexToServerIndex[region]); 706 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 707 if (tempRegionCacheRatio > bestRegionCacheRatio) { 708 LOG.warn( 709 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 710 + "best region cache ratio {} on server {}", 711 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 712 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 713 } 714 } 715 } 716 717 protected float getOrComputeRegionCacheRatio(int region, int server) { 718 if ( 719 regionServerIndexWithBestRegionCachedRatio == null 720 || regionIndexServerIndexRegionCachedRatio.isEmpty() 721 ) { 722 computeRegionServerRegionCacheRatio(); 723 } 724 725 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 726 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 727 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 728 : 0.0f; 729 } 730 731 public int[] getOrComputeServerWithBestRegionCachedRatio() { 732 if ( 733 regionServerIndexWithBestRegionCachedRatio == null 734 || regionIndexServerIndexRegionCachedRatio.isEmpty() 735 ) { 736 computeRegionServerRegionCacheRatio(); 737 } 738 return regionServerIndexWithBestRegionCachedRatio; 739 } 740 741 /** 742 * Finds and return the latest reported cache ratio for the region on the RegionServer it's 743 * currently online. 744 */ 745 float getObservedRegionCacheRatio(int region) { 746 Deque<BalancerRegionLoad> dq = regionLoads[region]; 747 if (dq == null || dq.isEmpty()) { 748 return 0.0f; 749 } 750 return dq.getLast().getCurrentRegionCacheRatio(); 751 } 752 753 /** 754 * Maps region index to rack index 755 */ 756 public int getRackForRegion(int region) { 757 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 758 } 759 760 enum LocalityType { 761 SERVER, 762 RACK 763 } 764 765 public void doAction(BalanceAction action) { 766 switch (action.getType()) { 767 case NULL: 768 break; 769 case ASSIGN_REGION: 770 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 771 assert action instanceof AssignRegionAction : action.getClass(); 772 AssignRegionAction ar = (AssignRegionAction) action; 773 regionsPerServer[ar.getServer()] = 774 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 775 regionMoved(ar.getRegion(), -1, ar.getServer()); 776 break; 777 case MOVE_REGION: 778 assert action instanceof MoveRegionAction : action.getClass(); 779 MoveRegionAction mra = (MoveRegionAction) action; 780 regionsPerServer[mra.getFromServer()] = 781 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 782 regionsPerServer[mra.getToServer()] = 783 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 784 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 785 break; 786 case SWAP_REGIONS: 787 assert action instanceof SwapRegionsAction : action.getClass(); 788 SwapRegionsAction a = (SwapRegionsAction) action; 789 regionsPerServer[a.getFromServer()] = 790 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 791 regionsPerServer[a.getToServer()] = 792 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 793 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 794 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 795 break; 796 case MOVE_BATCH: 797 assert action instanceof MoveBatchAction : action.getClass(); 798 MoveBatchAction mba = (MoveBatchAction) action; 799 for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { 800 Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); 801 regionsPerServer[serverIndex] = 802 removeRegions(regionsPerServer[serverIndex], regionsToRemove); 803 } 804 for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { 805 Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); 806 regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); 807 } 808 for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { 809 regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), 810 moveRegionAction.getToServer()); 811 } 812 break; 813 default: 814 throw new RuntimeException("Unknown action:" + action.getType()); 815 } 816 } 817 818 /** 819 * Return true if the placement of region on server would lower the availability of the region in 820 * question 821 * @return true or false 822 */ 823 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 824 if (!serversToIndex.containsKey(serverName.getAddress())) { 825 return false; // safeguard against race between cluster.servers and servers from LB method 826 // args 827 } 828 int server = serversToIndex.get(serverName.getAddress()); 829 int region = regionsToIndex.get(regionInfo); 830 831 // Region replicas for same region should better assign to different servers 832 for (int i : regionsPerServer[server]) { 833 RegionInfo otherRegionInfo = regions[i]; 834 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 835 return true; 836 } 837 } 838 839 int primary = regionIndexToPrimaryIndex[region]; 840 if (primary == -1) { 841 return false; 842 } 843 // there is a subset relation for server < host < rack 844 // check server first 845 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 846 if (result != 0) { 847 return result > 0; 848 } 849 850 // check host 851 if (multiServersPerHost) { 852 result = checkLocationForPrimary(serverIndexToHostIndex[server], 853 colocatedReplicaCountsPerHost, primary); 854 if (result != 0) { 855 return result > 0; 856 } 857 } 858 859 // check rack 860 if (numRacks > 1) { 861 result = checkLocationForPrimary(serverIndexToRackIndex[server], 862 colocatedReplicaCountsPerRack, primary); 863 if (result != 0) { 864 return result > 0; 865 } 866 } 867 return false; 868 } 869 870 /** 871 * Common method for better solution check. 872 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 873 * colocatedReplicaCountsPerRack 874 * @return 1 for better, -1 for no better, 0 for unknown 875 */ 876 private int checkLocationForPrimary(int location, 877 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 878 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 879 // check for whether there are other Locations that we can place this region 880 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 881 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 882 return 1; // meaning there is a better Location 883 } 884 } 885 return -1; // there is not a better Location to place this 886 } 887 return 0; 888 } 889 890 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 891 if (!serversToIndex.containsKey(serverName.getAddress())) { 892 return; 893 } 894 int server = serversToIndex.get(serverName.getAddress()); 895 int region = regionsToIndex.get(regionInfo); 896 doAction(new AssignRegionAction(region, server)); 897 } 898 899 void regionMoved(int region, int oldServer, int newServer) { 900 regionIndexToServerIndex[region] = newServer; 901 if (initialRegionIndexToServerIndex[region] == newServer) { 902 numMovedRegions--; // region moved back to original location 903 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 904 numMovedRegions++; // region moved from original location 905 } 906 int tableIndex = regionIndexToTableIndex[region]; 907 if (oldServer >= 0) { 908 numRegionsPerServerPerTable[tableIndex][oldServer]--; 909 } 910 numRegionsPerServerPerTable[tableIndex][newServer]++; 911 912 // update for servers 913 int primary = regionIndexToPrimaryIndex[region]; 914 if (oldServer >= 0) { 915 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 916 } 917 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 918 919 // update for hosts 920 if (multiServersPerHost) { 921 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 922 oldServer, newServer, primary, region); 923 } 924 925 // update for racks 926 if (numRacks > 1) { 927 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 928 oldServer, newServer, primary, region); 929 } 930 } 931 932 /** 933 * Common method for per host and per Location region index updates when a region is moved. 934 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 935 * @param regionsPerLocation regionsPerHost or regionsPerLocation 936 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 937 * colocatedReplicaCountsPerRack 938 */ 939 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 940 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 941 int primary, int region) { 942 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 943 int newLocation = serverIndexToLocation[newServer]; 944 if (newLocation != oldLocation) { 945 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 946 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 947 if (oldLocation >= 0) { 948 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 949 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 950 } 951 } 952 953 } 954 955 int[] removeRegion(int[] regions, int regionIndex) { 956 // TODO: this maybe costly. Consider using linked lists 957 int[] newRegions = new int[regions.length - 1]; 958 int i = 0; 959 for (i = 0; i < regions.length; i++) { 960 if (regions[i] == regionIndex) { 961 break; 962 } 963 newRegions[i] = regions[i]; 964 } 965 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 966 return newRegions; 967 } 968 969 int[] addRegion(int[] regions, int regionIndex) { 970 int[] newRegions = new int[regions.length + 1]; 971 System.arraycopy(regions, 0, newRegions, 0, regions.length); 972 newRegions[newRegions.length - 1] = regionIndex; 973 return newRegions; 974 } 975 976 int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { 977 // Calculate the size of the new regions array 978 int newSize = regions.length - regionIndicesToRemove.size(); 979 if (newSize < 0) { 980 throw new IllegalStateException( 981 "Region indices mismatch: more regions to remove than in the regions array"); 982 } 983 984 int[] newRegions = new int[newSize]; 985 int newIndex = 0; 986 987 // Copy only the regions not in the removal set 988 for (int region : regions) { 989 if (!regionIndicesToRemove.contains(region)) { 990 newRegions[newIndex++] = region; 991 } 992 } 993 994 // If the newIndex is smaller than newSize, some regions were missing from the input array 995 if (newIndex != newSize) { 996 throw new IllegalStateException("Region indices mismatch: some regions in the removal " 997 + "set were not found in the regions array"); 998 } 999 1000 return newRegions; 1001 } 1002 1003 int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { 1004 int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; 1005 1006 // Copy the existing regions to the new array 1007 System.arraycopy(regions, 0, newRegions, 0, regions.length); 1008 1009 // Add the new regions at the end of the array 1010 int newIndex = regions.length; 1011 for (int regionIndex : regionIndicesToAdd) { 1012 newRegions[newIndex++] = regionIndex; 1013 } 1014 1015 return newRegions; 1016 } 1017 1018 List<Integer> getShuffledServerIndices() { 1019 return shuffledServerIndicesSupplier.get(); 1020 } 1021 1022 int[] addRegionSorted(int[] regions, int regionIndex) { 1023 int[] newRegions = new int[regions.length + 1]; 1024 int i = 0; 1025 for (i = 0; i < regions.length; i++) { // find the index to insert 1026 if (regions[i] > regionIndex) { 1027 break; 1028 } 1029 } 1030 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 1031 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 1032 newRegions[i] = regionIndex; 1033 1034 return newRegions; 1035 } 1036 1037 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 1038 int i = 0; 1039 for (i = 0; i < regions.length; i++) { 1040 if (regions[i] == regionIndex) { 1041 regions[i] = newRegionIndex; 1042 break; 1043 } 1044 } 1045 return regions; 1046 } 1047 1048 void sortServersByRegionCount() { 1049 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 1050 } 1051 1052 int getNumRegions(int server) { 1053 return regionsPerServer[server].length; 1054 } 1055 1056 boolean contains(int[] arr, int val) { 1057 return Arrays.binarySearch(arr, val) >= 0; 1058 } 1059 1060 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 1061 1062 public Comparator<Integer> getNumRegionsComparator() { 1063 return numRegionsComparator; 1064 } 1065 1066 int getLowestLocalityRegionOnServer(int serverIndex) { 1067 if (regionFinder != null) { 1068 float lowestLocality = 1.0f; 1069 int lowestLocalityRegionIndex = -1; 1070 if (regionsPerServer[serverIndex].length == 0) { 1071 // No regions on that region server 1072 return -1; 1073 } 1074 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 1075 int regionIndex = regionsPerServer[serverIndex][j]; 1076 HDFSBlocksDistribution distribution = 1077 regionFinder.getBlockDistribution(regions[regionIndex]); 1078 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 1079 // skip empty region 1080 if (distribution.getUniqueBlocksTotalWeight() == 0) { 1081 continue; 1082 } 1083 if (locality < lowestLocality) { 1084 lowestLocality = locality; 1085 lowestLocalityRegionIndex = j; 1086 } 1087 } 1088 if (lowestLocalityRegionIndex == -1) { 1089 return -1; 1090 } 1091 if (LOG.isTraceEnabled()) { 1092 LOG.trace("Lowest locality region is " 1093 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 1094 .getRegionNameAsString() 1095 + " with locality " + lowestLocality + " and its region server contains " 1096 + regionsPerServer[serverIndex].length + " regions"); 1097 } 1098 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 1099 } else { 1100 return -1; 1101 } 1102 } 1103 1104 float getLocalityOfRegion(int region, int server) { 1105 if (regionFinder != null) { 1106 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 1107 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 1108 } else { 1109 return 0f; 1110 } 1111 } 1112 1113 void setNumRegions(int numRegions) { 1114 this.numRegions = numRegions; 1115 } 1116 1117 void setNumMovedRegions(int numMovedRegions) { 1118 this.numMovedRegions = numMovedRegions; 1119 } 1120 1121 public int getMaxReplicas() { 1122 return maxReplicas; 1123 } 1124 1125 void setStopRequestedAt(long stopRequestedAt) { 1126 this.stopRequestedAt = stopRequestedAt; 1127 } 1128 1129 boolean isStopRequested() { 1130 return EnvironmentEdgeManager.currentTime() > stopRequestedAt; 1131 } 1132 1133 Deque<BalancerRegionLoad>[] getRegionLoads() { 1134 return regionLoads; 1135 } 1136 1137 @Override 1138 public String toString() { 1139 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1140 for (ServerName sn : servers) { 1141 desc.append(sn.getAddress().toString()).append(", "); 1142 } 1143 desc.append("], serverIndicesSortedByRegionCount=") 1144 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 1145 .append(Arrays.deepToString(regionsPerServer)); 1146 1147 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1148 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 1149 .append('}'); 1150 return desc.toString(); 1151 } 1152}