001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions 021 * based on the amount they are cached on a given server. A region can move across the region 022 * servers whenever a region server shuts down or crashes. The region server preserves the cache 023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism 024 * where it maintains the amount by which a region is cached on a region server. During balancer 025 * run, a region plan is generated that takes into account this cache information and tries to 026 * move the regions so that the cache is minimally impacted. 027 */ 028 029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; 030 031import java.math.BigDecimal; 032import java.util.ArrayDeque; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Deque; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Optional; 040import java.util.concurrent.ThreadLocalRandom; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.RegionMetrics; 044import org.apache.hadoop.hbase.ServerMetrics; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.Size; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.master.RackManager; 050import org.apache.hadoop.hbase.master.RegionPlan; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056@InterfaceAudience.Private 057public class CacheAwareLoadBalancer extends StochasticLoadBalancer { 058 private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); 059 060 public static final String CACHE_RATIO_THRESHOLD = 061 "hbase.master.balancer.stochastic.throttling.cacheRatio"; 062 public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f; 063 064 /** 065 * Below this cache ratio on the current host, a move may be considered for the free-space 066 * heuristic. 067 */ 068 public static final String LOW_CACHE_RATIO_FOR_RELOCATION_KEY = 069 "hbase.master.balancer.cacheaware.lowCacheRatioThreshold"; 070 public static final float LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT = 0.35f; 071 072 /** 073 * Optimistic region cache ratio assumed for cost purposes when a better host has free cache space 074 * (actual warmup is not modeled). 075 */ 076 public static final String POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY = 077 "hbase.master.balancer.cacheaware.potentialCacheRatioAfterMove"; 078 public static final float POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT = 0.95f; 079 080 /** 081 * Minimum free block cache on a target server, as a multiple of the region's on-disk size in 082 * bytes, required to count that server as a relocation opportunity. 083 */ 084 public static final String MIN_FREE_CACHE_SPACE_FACTOR_KEY = 085 "hbase.master.balancer.cacheaware.minFreeCacheSpaceFactor"; 086 public static final float MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT = 1.0f; 087 088 public Float ratioThreshold; 089 090 private Long sleepTime; 091 private Configuration configuration; 092 093 private float lowCacheRatioThreshold; 094 private float potentialCacheRatioAfterMove; 095 private float minFreeCacheSpaceFactor; 096 097 private BigDecimal simulatedRatio = BigDecimal.ZERO; 098 099 @Override 100 public void loadConf(Configuration configuration) { 101 this.configuration = configuration; 102 this.costFunctions = new ArrayList<>(); 103 super.loadConf(configuration); 104 ratioThreshold = 105 this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT); 106 sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis()); 107 lowCacheRatioThreshold = configuration.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY, 108 LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT); 109 potentialCacheRatioAfterMove = configuration.getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY, 110 POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT); 111 minFreeCacheSpaceFactor = 112 configuration.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT); 113 } 114 115 @Override 116 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> 117 createCandidateGenerators(Configuration conf) { 118 Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators = 119 new HashMap<>(2); 120 candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class, 121 new CacheAwareSkewnessCandidateGenerator()); 122 candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator()); 123 return candidateGenerators; 124 } 125 126 @Override 127 protected List<CostFunction> createCostFunctions(Configuration configuration) { 128 List<CostFunction> costFunctions = new ArrayList<>(); 129 addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); 130 addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); 131 return costFunctions; 132 } 133 134 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 135 if (costFunction.getMultiplier() > 0) { 136 costFunctions.add(costFunction); 137 } 138 } 139 140 @Override 141 public void updateClusterMetrics(ClusterMetrics clusterMetrics) { 142 this.clusterStatus = clusterMetrics; 143 updateRegionLoad(); 144 } 145 146 protected Map<ServerName, Long> getServerBlockCacheFreeBytes() { 147 if (clusterStatus == null) { 148 return null; 149 } 150 Map<ServerName, Long> map = new HashMap<>(); 151 clusterStatus.getLiveServerMetrics().forEach((sn, sm) -> map.put(sn, sm.getCacheFreeSize())); 152 return map; 153 } 154 155 @Override 156 protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState, 157 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder, 158 RackManager rackManager) { 159 return new BalancerClusterState(clusterState, loads, finder, rackManager, 160 regionCacheRatioOnOldServerMap, getServerBlockCacheFreeBytes()); 161 } 162 163 /** 164 * Collect the amount of region cached for all the regions from all the active region servers. 165 */ 166 private void updateRegionLoad() { 167 loads = new HashMap<>(); 168 regionCacheRatioOnOldServerMap = new HashMap<>(); 169 Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>(); 170 171 // Build current region cache statistics 172 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 173 // Create a map of region and the server where it is currently hosted 174 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 175 String regionEncodedName = RegionInfo.encodeRegionName(regionName); 176 177 Deque<BalancerRegionLoad> rload = new ArrayDeque<>(); 178 179 // Get the total size of the hFiles in this region 180 int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); 181 182 rload.add(new BalancerRegionLoad(rm)); 183 // Maintain a map of region and its total size. This is needed to calculate the cache 184 // ratios for the regions cached on old region servers 185 regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); 186 loads.put(regionEncodedName, rload); 187 }); 188 }); 189 190 // Build cache statistics for the regions hosted previously on old region servers 191 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 192 // Find if a region was previously hosted on a server other than the one it is currently 193 // hosted on. 194 sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { 195 // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on 196 // this server 197 if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { 198 ServerName currentServer = 199 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); 200 if (!ServerName.isSameAddress(currentServer, sn)) { 201 int regionSizeMB = 202 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); 203 // The coldDataSize accounts for data size classified as "cold" by DataTieringManager, 204 // which should be kept out of cache. We calculate cache ratio on old server based 205 // only on the hot data size for the region (regionSizeMB - coldDataSize), as we 206 // don't want to move regions with low cache ratio due to data classified as cold. 207 int coldDataSize = sm.getRegionColdDataSize().getOrDefault(regionEncodedName, 0); 208 float regionCacheRatioOnOldServer = (regionSizeMB - coldDataSize) <= 0 209 ? 0.0f 210 : (float) regionSizeInCache / (regionSizeMB - coldDataSize); 211 regionCacheRatioOnOldServerMap.put(regionEncodedName, 212 new Pair<>(sn, regionCacheRatioOnOldServer)); 213 } 214 } 215 }); 216 }); 217 } 218 219 private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { 220 Optional<RegionInfo> regionInfoOptional = 221 Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { 222 return regionName.equals(ri.getEncodedName()); 223 }).findFirst(); 224 225 if (regionInfoOptional.isPresent()) { 226 return regionInfoOptional.get(); 227 } 228 return null; 229 } 230 231 @Override 232 public long getThrottleDurationMs(RegionPlan plan) { 233 Pair<ServerName, Float> rsRatio = this.regionCacheRatioOnOldServerMap.get(plan.getRegionName()); 234 if ( 235 rsRatio != null && plan.getDestination().equals(rsRatio.getFirst()) 236 && rsRatio.getSecond() >= ratioThreshold 237 ) { 238 LOG.debug("Moving region {} to server {} with cache ratio {}. No throttling needed.", 239 plan.getRegionInfo().getEncodedName(), plan.getDestination(), rsRatio.getSecond()); 240 return 0L; 241 } else { 242 if (rsRatio != null) { 243 LOG.debug("Moving region {} to server {} with cache ratio: {}. Throttling move for {}ms.", 244 plan.getRegionInfo().getEncodedName(), plan.getDestination(), 245 plan.getDestination().equals(rsRatio.getFirst()) ? rsRatio.getSecond() : "unknown", 246 sleepTime); 247 } else { 248 LOG.debug( 249 "Moving region {} to server {} with no cache ratio info for the region. " 250 + "Throttling move for {}ms.", 251 plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime); 252 } 253 return sleepTime; 254 } 255 } 256 257 @Override 258 protected List<RegionPlan> balanceTable(TableName tableName, 259 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 260 final Map<String, Pair<ServerName, Float>> snapshot = new HashMap<>(); 261 snapshot.putAll(this.regionCacheRatioOnOldServerMap); 262 List<RegionPlan> plans = super.balanceTable(tableName, loadOfOneTable); 263 if (plans == null) { 264 return plans; 265 } 266 plans.sort((p1, p2) -> { 267 Pair<ServerName, Float> pair1 = snapshot.get(p1.getRegionName()); 268 Float ratio1 = 269 pair1 == null ? 0 : pair1.getFirst().equals(p1.getDestination()) ? pair1.getSecond() : 0f; 270 Pair<ServerName, Float> pair2 = snapshot.get(p2.getRegionName()); 271 Float ratio2 = 272 pair2 == null ? 0 : pair2.getFirst().equals(p2.getDestination()) ? pair2.getSecond() : 0f; 273 return ratio1.compareTo(ratio2) * (-1); 274 }); 275 return plans; 276 } 277 278 private class CacheAwareCandidateGenerator extends CandidateGenerator { 279 @Override 280 protected BalanceAction generate(BalancerClusterState cluster) { 281 simulatedRatio = BigDecimal.ZERO; 282 // Move the regions to the servers they were previously hosted on based on the cache ratio 283 if ( 284 !regionCacheRatioOnOldServerMap.isEmpty() 285 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 286 ) { 287 Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap = 288 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 289 // Get the server where this region was previously hosted 290 String regionEncodedName = regionCacheRatioServerMap.getKey(); 291 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 292 if (regionInfo == null) { 293 LOG.warn("Region {} not found", regionEncodedName); 294 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 295 return BalanceAction.NULL_ACTION; 296 } 297 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 298 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 299 return BalanceAction.NULL_ACTION; 300 } 301 int regionIndex = cluster.regionsToIndex.get(regionInfo); 302 int oldServerIndex = cluster.serversToIndex 303 .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); 304 if (oldServerIndex < 0) { 305 LOG.warn("Server previously hosting region {} not found", regionEncodedName); 306 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 307 return BalanceAction.NULL_ACTION; 308 } 309 310 float oldRegionCacheRatio = 311 cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); 312 int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 313 float currentRegionCacheRatio = 314 cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); 315 316 BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, 317 currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); 318 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 319 return action; 320 } 321 return generatePlanForFreeCacheSpace(cluster); 322 } 323 324 private BalanceAction generatePlanForFreeCacheSpace(BalancerClusterState cluster) { 325 if (cluster.serverBlockCacheFreeSize == null) { 326 return BalanceAction.NULL_ACTION; 327 } 328 List<BalanceAction> possibleActions = new ArrayList<>(); 329 Map<Integer, Long> serverFreeCacheAfterAction = new HashMap<>(); 330 for (int region = 0; region < cluster.numRegions; region++) { 331 RegionInfo regionInfo = cluster.regions[region]; 332 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 333 continue; 334 } 335 int currentServer = cluster.regionIndexToServerIndex[region]; 336 float ratio = cluster.getSumRegionCacheAndColdDataRatio(region); 337 if (ratio >= lowCacheRatioThreshold) { 338 continue; 339 } 340 int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region); 341 if (regionSizeMb <= 0) { 342 continue; 343 } 344 long bytesNeeded = (long) (regionSizeMb * 1024L * 1024L * minFreeCacheSpaceFactor); 345 for (int server = 0; server < cluster.numServers; server++) { 346 // Skips current server for region, as we can't generate a move to same server 347 if (server == currentServer) { 348 continue; 349 } 350 serverFreeCacheAfterAction.putIfAbsent(server, cluster.serverBlockCacheFreeSize[server]); 351 if (serverFreeCacheAfterAction.get(server) >= bytesNeeded) { 352 serverFreeCacheAfterAction.compute(server, (s, freeCache) -> freeCache - bytesNeeded); 353 possibleActions.add(getAction(currentServer, region, server, -1)); 354 } 355 } 356 } 357 if (!possibleActions.isEmpty()) { 358 BalanceAction action = 359 possibleActions.get(ThreadLocalRandom.current().nextInt(possibleActions.size())); 360 LOG.debug("region {} had sum ratio {}", 361 cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(), 362 cluster.getSumRegionCacheAndColdDataRatio(((MoveRegionAction) action).getRegion())); 363 return action; 364 } 365 return BalanceAction.NULL_ACTION; 366 } 367 368 private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, 369 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 370 float cacheRatioOnOldServer) { 371 return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, 372 cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) 373 ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) 374 : generatePlanForFreeCacheSpace(cluster); 375 } 376 377 private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, 378 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 379 float cacheRatioOnOldServer) { 380 // Find if the region has already moved by comparing the current server index with the 381 // current server index. This can happen when other candidate generator has moved the region 382 if (currentServerIndex < 0 || oldServerIndex < 0) { 383 return false; 384 } 385 386 float cacheRatioDiffThreshold = 0.6f; 387 388 // Conditions for moving the region 389 390 // If the region is fully cached on the old server, move the region back 391 if (cacheRatioOnOldServer == 1.0f) { 392 if (LOG.isDebugEnabled()) { 393 LOG.debug("Region {} moved to the old server {} as it is fully cached there", 394 cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); 395 } 396 return true; 397 } 398 399 // Move the region back to the old server if it is cached equally on both the servers 400 if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { 401 if (LOG.isDebugEnabled()) { 402 LOG.debug( 403 "Region {} moved from {} to {} as the region is cached {} equally on both servers", 404 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 405 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); 406 } 407 return true; 408 } 409 410 // If the region is not fully cached on either of the servers, move the region back to the 411 // old server if the region cache ratio on the current server is still much less than the old 412 // server 413 if ( 414 cacheRatioOnOldServer > 0.0f 415 && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold 416 ) { 417 if (LOG.isDebugEnabled()) { 418 LOG.debug( 419 "Region {} moved from {} to {} as region cache ratio {} is better than the current " 420 + "cache ratio {}", 421 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 422 cluster.servers[oldServerIndex], cacheRatioOnOldServer, cacheRatioOnCurrentServer); 423 } 424 return true; 425 } 426 427 if (LOG.isDebugEnabled()) { 428 LOG.debug( 429 "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", 430 cluster.regions[regionIndex], cluster.servers[currentServerIndex], 431 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); 432 } 433 return false; 434 } 435 } 436 437 private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { 438 @Override 439 BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { 440 simulatedRatio = BigDecimal.ZERO; 441 // First move all the regions which were hosted previously on some other server back to their 442 // old servers 443 if ( 444 !regionCacheRatioOnOldServerMap.isEmpty() 445 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 446 ) { 447 // Get the first region index in the historical cache ratio list 448 Map.Entry<String, Pair<ServerName, Float>> regionEntry = 449 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 450 String regionEncodedName = regionEntry.getKey(); 451 452 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 453 if (regionInfo == null) { 454 LOG.warn("Region {} does not exist", regionEncodedName); 455 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 456 return BalanceAction.NULL_ACTION; 457 } 458 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 459 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 460 return BalanceAction.NULL_ACTION; 461 } 462 463 int regionIndex = cluster.regionsToIndex.get(regionInfo); 464 465 // Get the current host name for this region 466 thisServer = cluster.regionIndexToServerIndex[regionIndex]; 467 468 // Get the old server index 469 otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); 470 471 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 472 473 if (otherServer < 0) { 474 // The old server has been moved to other host and hence, the region cannot be moved back 475 // to the old server 476 if (LOG.isDebugEnabled()) { 477 LOG.debug( 478 "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " 479 + "server {} as the server does not exist", 480 regionEncodedName, regionEntry.getValue().getFirst().getHostname()); 481 } 482 return BalanceAction.NULL_ACTION; 483 } 484 485 if (LOG.isDebugEnabled()) { 486 LOG.debug( 487 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " 488 + "was hosted there earlier", 489 regionEncodedName, cluster.servers[thisServer].getHostname(), 490 cluster.servers[otherServer].getHostname()); 491 } 492 493 return getAction(thisServer, regionIndex, otherServer, -1); 494 } 495 496 if (thisServer < 0 || otherServer < 0) { 497 return BalanceAction.NULL_ACTION; 498 } 499 500 int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); 501 if (regionIndexToMove < 0) { 502 if (LOG.isDebugEnabled()) { 503 LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); 504 } 505 return BalanceAction.NULL_ACTION; 506 } 507 if (LOG.isDebugEnabled()) { 508 LOG.debug( 509 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " 510 + "least cached on current server", 511 cluster.regions[regionIndexToMove].getEncodedName(), 512 cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); 513 } 514 return getAction(thisServer, regionIndexToMove, otherServer, -1); 515 } 516 517 private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { 518 float minCacheRatio = Float.MAX_VALUE; 519 int leastCachedRegion = -1; 520 for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { 521 int regionIndex = cluster.regionsPerServer[thisServer][i]; 522 523 float cacheRatioOnCurrentServer = 524 cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); 525 if (cacheRatioOnCurrentServer < minCacheRatio) { 526 minCacheRatio = cacheRatioOnCurrentServer; 527 leastCachedRegion = regionIndex; 528 } 529 } 530 return leastCachedRegion; 531 } 532 } 533 534 static class CacheAwareRegionSkewnessCostFunction extends CostFunction { 535 static final String REGION_COUNT_SKEW_COST_KEY = 536 "hbase.master.balancer.stochastic.regionCountCost"; 537 static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; 538 private final DoubleArrayCost cost = new DoubleArrayCost(); 539 540 CacheAwareRegionSkewnessCostFunction(Configuration conf) { 541 // Load multiplier should be the greatest as it is the most general way to balance data. 542 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 543 } 544 545 @Override 546 void prepare(BalancerClusterState cluster) { 547 super.prepare(cluster); 548 cost.prepare(cluster.numServers); 549 cost.applyCostsChange(costs -> { 550 for (int i = 0; i < cluster.numServers; i++) { 551 costs[i] = cluster.regionsPerServer[i].length; 552 } 553 }); 554 } 555 556 @Override 557 protected double cost() { 558 return cost.cost(); 559 } 560 561 @Override 562 protected void regionMoved(int region, int oldServer, int newServer) { 563 cost.applyCostsChange(costs -> { 564 costs[oldServer] = cluster.regionsPerServer[oldServer].length; 565 costs[newServer] = cluster.regionsPerServer[newServer].length; 566 }); 567 } 568 569 @Override 570 public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) { 571 weights.merge(LoadCandidateGenerator.class, cost(), Double::sum); 572 } 573 } 574 575 class CacheAwareCostFunction extends CostFunction { 576 private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; 577 private double cacheRatio; 578 private double bestCacheRatio; 579 private final float lowCacheRatioThreshold; 580 private final float potentialCacheRatioAfterMove; 581 private final float minFreeCacheSpaceFactor; 582 583 private static final float DEFAULT_CACHE_COST = 20; 584 585 CacheAwareCostFunction(Configuration conf) { 586 boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; 587 // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled 588 this.setMultiplier( 589 !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); 590 bestCacheRatio = 0.0; 591 cacheRatio = 0.0; 592 lowCacheRatioThreshold = 593 conf.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY, LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT); 594 potentialCacheRatioAfterMove = Math.min(1.0f, conf 595 .getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY, POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT)); 596 minFreeCacheSpaceFactor = 597 conf.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT); 598 } 599 600 @Override 601 void prepare(BalancerClusterState cluster) { 602 super.prepare(cluster); 603 recomputeCacheRatio(cluster); 604 if (LOG.isDebugEnabled()) { 605 LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); 606 } 607 } 608 609 private void recomputeCacheRatio(BalancerClusterState cluster) { 610 double[] currentWeighted = computeCurrentWeightedContributions(cluster); 611 double currentSum = 0.0; 612 double bestCacheSum = 0.0; 613 for (int region = 0; region < cluster.numRegions; region++) { 614 currentSum += currentWeighted[region]; 615 // here we only get the server index where this region cache ratio is the highest 616 int serverIndexBestCache = cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; 617 // get the highest cacheRatio for this region on the current state of allocations 618 double currentHighestCache = 619 cluster.getOrComputeWeightedRegionCacheRatio(region, serverIndexBestCache); 620 // Get a hypothetical best cache ratio for this region if any server has enough free cache 621 // to host it. 622 double potentialHighestCache = potentialBestWeightedFromFreeCache(cluster, region); 623 bestCacheSum += Math.max(currentHighestCache, potentialHighestCache); 624 } 625 bestCacheRatio = bestCacheSum; 626 if (bestCacheSum <= 0.0) { 627 cacheRatio = cluster.numRegions == 0 ? 1.0 : 0.0; 628 } else { 629 cacheRatio = Math.min(1.0, currentSum / bestCacheSum); 630 } 631 } 632 633 private double[] computeCurrentWeightedContributions(BalancerClusterState cluster) { 634 int totalRegions = cluster.numRegions; 635 double[] contrib = new double[totalRegions]; 636 for (int r = 0; r < totalRegions; r++) { 637 int s = cluster.regionIndexToServerIndex[r]; 638 int sizeMb = cluster.getRegionSizeMinusColdDataMB(r); 639 if (sizeMb <= 0) { 640 contrib[r] = 0.0; 641 continue; 642 } 643 boolean movedInSimulation = cluster.initialRegionIndexToServerIndex[r] != s; 644 if ( 645 cluster.serverBlockCacheFreeSize != null && movedInSimulation 646 && cluster.getSumRegionCacheAndColdDataRatio(r) < lowCacheRatioThreshold 647 ) { 648 LOG.debug("Region {} is simulated moved to new server {}", 649 cluster.regions[r].getEncodedName(), cluster.servers[s].getHostname()); 650 long bytesNeeded = (long) (sizeMb * 1024L * 1024L * minFreeCacheSpaceFactor); 651 if (cluster.serverBlockCacheFreeSize[s] >= bytesNeeded) { 652 contrib[r] = sizeMb * potentialCacheRatioAfterMove; 653 continue; 654 } 655 } 656 contrib[r] = cluster.getOrComputeWeightedRegionCacheRatio(r, s); 657 } 658 return contrib; 659 } 660 661 /* 662 * If this region is cold in metrics and at least one RS (including its current host) reports 663 * enough free block cache to hold it, return an optimistic weighted cache score ({@link 664 * #potentialCacheRatioAfterMove} * region MB) so placement is not considered optimal solely 665 * from low ratios when capacity exists somewhere in the cluster. 666 */ 667 private double potentialBestWeightedFromFreeCache(BalancerClusterState cluster, int region) { 668 float observedRatio = cluster.getSumRegionCacheAndColdDataRatio(region); 669 if (observedRatio >= lowCacheRatioThreshold) { 670 return 0.0; 671 } 672 int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region); 673 if (regionSizeMb <= 0) { 674 return 0.0; 675 } 676 long regionSizeBytes = (long) regionSizeMb * 1024L * 1024L; 677 long requiredFree = (long) (regionSizeBytes * minFreeCacheSpaceFactor); 678 for (int s = 0; s < cluster.numServers; s++) { 679 if (cluster.serverBlockCacheFreeSize[s] >= requiredFree) { 680 return regionSizeMb * potentialCacheRatioAfterMove; 681 } 682 } 683 return 0.0; 684 } 685 686 @Override 687 protected double cost() { 688 return scale(0, 1, 1 - cacheRatio); 689 } 690 691 @Override 692 protected void regionMoved(int region, int oldServer, int newServer) { 693 double regionCacheRatioOnOldServer = 694 cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); 695 if (simulatedRatio.equals(BigDecimal.ZERO)) { 696 double potentialCachedSizeOnNewServer = 697 cluster.getRegionSizeMinusColdDataMB(region) * potentialCacheRatioAfterMove; 698 boolean simulateCacheBasedOnFreeSpace = 699 cluster.getOrComputeRegionCacheRatio(region, oldServer) < lowCacheRatioThreshold 700 && cluster.serverBlockCacheFreeSize[newServer] >= potentialCachedSizeOnNewServer; 701 double regionCacheRatioOnNewServer = simulateCacheBasedOnFreeSpace 702 ? potentialCachedSizeOnNewServer 703 : cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); 704 double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; 705 double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; 706 LOG.debug( 707 "simulating moving region {} using simulateCacheBasedOnFreeSpace={} " 708 + "got a normalized delta of {} to be added to cacheRatio: {}", 709 cluster.regions[region].getEncodedName(), simulateCacheBasedOnFreeSpace, normalizedDelta, 710 cacheRatio); 711 simulatedRatio = BigDecimal.valueOf(normalizedDelta); 712 cacheRatio += normalizedDelta; 713 if (cacheRatio < 0.0 || cacheRatio > 1.0) { 714 LOG.info( 715 "Recomputing cacheRatio after calculating impact of region move: \n " 716 + "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:" 717 + "regionCacheRatioOnOldServer:{}:regionCacheRatioOnNewServer:{}:" 718 + "bestRegionCacheRatio:{}:cacheRatio:{}", 719 cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), 720 cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, 721 regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); 722 recomputeCacheRatio(cluster); 723 } 724 } else { 725 // This means we are in an undoAction call and need to reverse the cache delta applied in 726 // the region move simulation 727 cacheRatio -= simulatedRatio.doubleValue(); 728 } 729 } 730 731 private int getServerWithBestCacheRatioForRegion(int region) { 732 return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; 733 } 734 735 @Override 736 public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) { 737 weights.merge(LoadCandidateGenerator.class, cost(), Double::sum); 738 } 739 } 740}