001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions 021 * based on the amount they are cached on a given server. A region can move across the region 022 * servers whenever a region server shuts down or crashes. The region server preserves the cache 023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism 024 * where it maintains the amount by which a region is cached on a region server. During balancer 025 * run, a region plan is generated that takes into account this cache information and tries to 026 * move the regions so that the cache minimally impacted. 027 */ 028 029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; 030 031import java.text.DecimalFormat; 032import java.util.ArrayDeque; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Deque; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Optional; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.ClusterMetrics; 042import org.apache.hadoop.hbase.RegionMetrics; 043import org.apache.hadoop.hbase.ServerMetrics; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.Size; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@InterfaceAudience.Private 053public class CacheAwareLoadBalancer extends StochasticLoadBalancer { 054 private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); 055 056 private Configuration configuration; 057 058 public enum GeneratorFunctionType { 059 LOAD, 060 CACHE_RATIO 061 } 062 063 @Override 064 public synchronized void loadConf(Configuration configuration) { 065 this.configuration = configuration; 066 this.costFunctions = new ArrayList<>(); 067 super.loadConf(configuration); 068 } 069 070 @Override 071 protected List<CandidateGenerator> createCandidateGenerators() { 072 List<CandidateGenerator> candidateGenerators = new ArrayList<>(2); 073 candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(), 074 new CacheAwareSkewnessCandidateGenerator()); 075 candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(), 076 new CacheAwareCandidateGenerator()); 077 return candidateGenerators; 078 } 079 080 @Override 081 protected List<CostFunction> createCostFunctions(Configuration configuration) { 082 List<CostFunction> costFunctions = new ArrayList<>(); 083 addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); 084 addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); 085 return costFunctions; 086 } 087 088 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 089 if (costFunction.getMultiplier() > 0) { 090 costFunctions.add(costFunction); 091 } 092 } 093 094 @Override 095 public void updateClusterMetrics(ClusterMetrics clusterMetrics) { 096 this.clusterStatus = clusterMetrics; 097 updateRegionLoad(); 098 } 099 100 /** 101 * Collect the amount of region cached for all the regions from all the active region servers. 102 */ 103 private void updateRegionLoad() { 104 loads = new HashMap<>(); 105 regionCacheRatioOnOldServerMap = new HashMap<>(); 106 Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>(); 107 108 // Build current region cache statistics 109 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 110 // Create a map of region and the server where it is currently hosted 111 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 112 String regionEncodedName = RegionInfo.encodeRegionName(regionName); 113 114 Deque<BalancerRegionLoad> rload = new ArrayDeque<>(); 115 116 // Get the total size of the hFiles in this region 117 int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); 118 119 rload.add(new BalancerRegionLoad(rm)); 120 // Maintain a map of region and it's total size. This is needed to calculate the cache 121 // ratios for the regions cached on old region servers 122 regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); 123 loads.put(regionEncodedName, rload); 124 }); 125 }); 126 127 // Build cache statistics for the regions hosted previously on old region servers 128 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 129 // Find if a region was previously hosted on a server other than the one it is currently 130 // hosted on. 131 sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { 132 // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on 133 // this server 134 if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { 135 ServerName currentServer = 136 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); 137 if (!ServerName.isSameAddress(currentServer, sn)) { 138 int regionSizeMB = 139 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); 140 float regionCacheRatioOnOldServer = 141 regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; 142 regionCacheRatioOnOldServerMap.put(regionEncodedName, 143 new Pair<>(sn, regionCacheRatioOnOldServer)); 144 } 145 } 146 }); 147 }); 148 } 149 150 private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { 151 Optional<RegionInfo> regionInfoOptional = 152 Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { 153 return regionName.equals(ri.getEncodedName()); 154 }).findFirst(); 155 156 if (regionInfoOptional.isPresent()) { 157 return regionInfoOptional.get(); 158 } 159 return null; 160 } 161 162 private class CacheAwareCandidateGenerator extends CandidateGenerator { 163 @Override 164 protected BalanceAction generate(BalancerClusterState cluster) { 165 // Move the regions to the servers they were previously hosted on based on the cache ratio 166 if ( 167 !regionCacheRatioOnOldServerMap.isEmpty() 168 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 169 ) { 170 Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap = 171 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 172 // Get the server where this region was previously hosted 173 String regionEncodedName = regionCacheRatioServerMap.getKey(); 174 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 175 if (regionInfo == null) { 176 LOG.warn("Region {} not found", regionEncodedName); 177 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 178 return BalanceAction.NULL_ACTION; 179 } 180 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 181 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 182 return BalanceAction.NULL_ACTION; 183 } 184 int regionIndex = cluster.regionsToIndex.get(regionInfo); 185 int oldServerIndex = cluster.serversToIndex 186 .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); 187 if (oldServerIndex < 0) { 188 LOG.warn("Server previously hosting region {} not found", regionEncodedName); 189 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 190 return BalanceAction.NULL_ACTION; 191 } 192 193 float oldRegionCacheRatio = 194 cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); 195 int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 196 float currentRegionCacheRatio = 197 cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); 198 199 BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, 200 currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); 201 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 202 return action; 203 } 204 return BalanceAction.NULL_ACTION; 205 } 206 207 private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, 208 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 209 float cacheRatioOnOldServer) { 210 return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, 211 cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) 212 ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) 213 : BalanceAction.NULL_ACTION; 214 } 215 216 private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, 217 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 218 float cacheRatioOnOldServer) { 219 // Find if the region has already moved by comparing the current server index with the 220 // current server index. This can happen when other candidate generator has moved the region 221 if (currentServerIndex < 0 || oldServerIndex < 0) { 222 return false; 223 } 224 225 DecimalFormat df = new DecimalFormat("#"); 226 df.setMaximumFractionDigits(4); 227 228 float cacheRatioDiffThreshold = 0.6f; 229 230 // Conditions for moving the region 231 232 // If the region is fully cached on the old server, move the region back 233 if (cacheRatioOnOldServer == 1.0f) { 234 if (LOG.isDebugEnabled()) { 235 LOG.debug("Region {} moved to the old server {} as it is fully cached there", 236 cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); 237 } 238 return true; 239 } 240 241 // Move the region back to the old server if it is cached equally on both the servers 242 if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { 243 if (LOG.isDebugEnabled()) { 244 LOG.debug( 245 "Region {} moved from {} to {} as the region is cached {} equally on both servers", 246 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 247 cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer)); 248 } 249 return true; 250 } 251 252 // If the region is not fully cached on either of the servers, move the region back to the 253 // old server if the region cache ratio on the current server is still much less than the old 254 // server 255 if ( 256 cacheRatioOnOldServer > 0.0f 257 && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold 258 ) { 259 if (LOG.isDebugEnabled()) { 260 LOG.debug( 261 "Region {} moved from {} to {} as region cache ratio {} is better than the current " 262 + "cache ratio {}", 263 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 264 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, 265 df.format(cacheRatioOnCurrentServer)); 266 } 267 return true; 268 } 269 270 if (LOG.isDebugEnabled()) { 271 LOG.debug( 272 "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", 273 cluster.regions[regionIndex], cluster.servers[currentServerIndex], 274 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, 275 df.format(cacheRatioOnCurrentServer)); 276 } 277 return false; 278 } 279 } 280 281 private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { 282 @Override 283 BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { 284 // First move all the regions which were hosted previously on some other server back to their 285 // old servers 286 if ( 287 !regionCacheRatioOnOldServerMap.isEmpty() 288 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 289 ) { 290 // Get the first region index in the historical cache ratio list 291 Map.Entry<String, Pair<ServerName, Float>> regionEntry = 292 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 293 String regionEncodedName = regionEntry.getKey(); 294 295 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 296 if (regionInfo == null) { 297 LOG.warn("Region {} does not exist", regionEncodedName); 298 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 299 return BalanceAction.NULL_ACTION; 300 } 301 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 302 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 303 return BalanceAction.NULL_ACTION; 304 } 305 306 int regionIndex = cluster.regionsToIndex.get(regionInfo); 307 308 // Get the current host name for this region 309 thisServer = cluster.regionIndexToServerIndex[regionIndex]; 310 311 // Get the old server index 312 otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); 313 314 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 315 316 if (otherServer < 0) { 317 // The old server has been moved to other host and hence, the region cannot be moved back 318 // to the old server 319 if (LOG.isDebugEnabled()) { 320 LOG.debug( 321 "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " 322 + "server {} as the server does not exist", 323 regionEncodedName, regionEntry.getValue().getFirst().getHostname()); 324 } 325 return BalanceAction.NULL_ACTION; 326 } 327 328 if (LOG.isDebugEnabled()) { 329 LOG.debug( 330 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " 331 + "was hosted their earlier", 332 regionEncodedName, cluster.servers[thisServer].getHostname(), 333 cluster.servers[otherServer].getHostname()); 334 } 335 336 return getAction(thisServer, regionIndex, otherServer, -1); 337 } 338 339 if (thisServer < 0 || otherServer < 0) { 340 return BalanceAction.NULL_ACTION; 341 } 342 343 int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); 344 if (regionIndexToMove < 0) { 345 if (LOG.isDebugEnabled()) { 346 LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); 347 } 348 return BalanceAction.NULL_ACTION; 349 } 350 if (LOG.isDebugEnabled()) { 351 LOG.debug( 352 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " 353 + "least cached on current server", 354 cluster.regions[regionIndexToMove].getEncodedName(), 355 cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); 356 } 357 return getAction(thisServer, regionIndexToMove, otherServer, -1); 358 } 359 360 private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { 361 float minCacheRatio = Float.MAX_VALUE; 362 int leastCachedRegion = -1; 363 for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { 364 int regionIndex = cluster.regionsPerServer[thisServer][i]; 365 366 float cacheRatioOnCurrentServer = 367 cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); 368 if (cacheRatioOnCurrentServer < minCacheRatio) { 369 minCacheRatio = cacheRatioOnCurrentServer; 370 leastCachedRegion = regionIndex; 371 } 372 } 373 return leastCachedRegion; 374 } 375 } 376 377 static class CacheAwareRegionSkewnessCostFunction extends CostFunction { 378 static final String REGION_COUNT_SKEW_COST_KEY = 379 "hbase.master.balancer.stochastic.regionCountCost"; 380 static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; 381 private final DoubleArrayCost cost = new DoubleArrayCost(); 382 383 CacheAwareRegionSkewnessCostFunction(Configuration conf) { 384 // Load multiplier should be the greatest as it is the most general way to balance data. 385 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 386 } 387 388 @Override 389 void prepare(BalancerClusterState cluster) { 390 super.prepare(cluster); 391 cost.prepare(cluster.numServers); 392 cost.applyCostsChange(costs -> { 393 for (int i = 0; i < cluster.numServers; i++) { 394 costs[i] = cluster.regionsPerServer[i].length; 395 } 396 }); 397 } 398 399 @Override 400 protected double cost() { 401 return cost.cost(); 402 } 403 404 @Override 405 protected void regionMoved(int region, int oldServer, int newServer) { 406 cost.applyCostsChange(costs -> { 407 costs[oldServer] = cluster.regionsPerServer[oldServer].length; 408 costs[newServer] = cluster.regionsPerServer[newServer].length; 409 }); 410 } 411 412 public final void updateWeight(double[] weights) { 413 weights[GeneratorFunctionType.LOAD.ordinal()] += cost(); 414 } 415 } 416 417 static class CacheAwareCostFunction extends CostFunction { 418 private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; 419 private double cacheRatio; 420 private double bestCacheRatio; 421 422 private static final float DEFAULT_CACHE_COST = 20; 423 424 CacheAwareCostFunction(Configuration conf) { 425 boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; 426 // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled 427 this.setMultiplier( 428 !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); 429 bestCacheRatio = 0.0; 430 cacheRatio = 0.0; 431 } 432 433 @Override 434 void prepare(BalancerClusterState cluster) { 435 super.prepare(cluster); 436 cacheRatio = 0.0; 437 bestCacheRatio = 0.0; 438 439 for (int region = 0; region < cluster.numRegions; region++) { 440 cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 441 cluster.regionIndexToServerIndex[region]); 442 bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 443 getServerWithBestCacheRatioForRegion(region)); 444 } 445 446 cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; 447 if (LOG.isDebugEnabled()) { 448 LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); 449 } 450 } 451 452 @Override 453 protected double cost() { 454 return scale(0, 1, 1 - cacheRatio); 455 } 456 457 @Override 458 protected void regionMoved(int region, int oldServer, int newServer) { 459 double regionCacheRatioOnOldServer = 460 cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); 461 double regionCacheRatioOnNewServer = 462 cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); 463 double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; 464 double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; 465 cacheRatio += normalizedDelta; 466 if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { 467 LOG.debug( 468 "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" 469 + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", 470 cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), 471 cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, 472 regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); 473 } 474 } 475 476 private int getServerWithBestCacheRatioForRegion(int region) { 477 return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; 478 } 479 480 @Override 481 public final void updateWeight(double[] weights) { 482 weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost(); 483 } 484 } 485}