001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions 021 * based on the amount they are cached on a given server. A region can move across the region 022 * servers whenever a region server shuts down or crashes. The region server preserves the cache 023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism 024 * where it maintains the amount by which a region is cached on a region server. During balancer 025 * run, a region plan is generated that takes into account this cache information and tries to 026 * move the regions so that the cache minimally impacted. 027 */ 028 029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; 030 031import java.text.DecimalFormat; 032import java.util.ArrayDeque; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Deque; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Optional; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.ClusterMetrics; 042import org.apache.hadoop.hbase.RegionMetrics; 043import org.apache.hadoop.hbase.ServerMetrics; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.Size; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@InterfaceAudience.Private 053public class CacheAwareLoadBalancer extends StochasticLoadBalancer { 054 private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); 055 056 private Configuration configuration; 057 058 public enum GeneratorFunctionType { 059 LOAD, 060 CACHE_RATIO 061 } 062 063 @Override 064 public synchronized void loadConf(Configuration configuration) { 065 this.configuration = configuration; 066 this.costFunctions = new ArrayList<>(); 067 super.loadConf(configuration); 068 } 069 070 @Override 071 protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> 072 createCandidateGenerators() { 073 Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators = 074 new HashMap<>(2); 075 candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class, 076 new CacheAwareSkewnessCandidateGenerator()); 077 candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator()); 078 return candidateGenerators; 079 } 080 081 @Override 082 protected List<CostFunction> createCostFunctions(Configuration configuration) { 083 List<CostFunction> costFunctions = new ArrayList<>(); 084 addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); 085 addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); 086 return costFunctions; 087 } 088 089 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 090 if (costFunction.getMultiplier() > 0) { 091 costFunctions.add(costFunction); 092 } 093 } 094 095 @Override 096 public void updateClusterMetrics(ClusterMetrics clusterMetrics) { 097 this.clusterStatus = clusterMetrics; 098 updateRegionLoad(); 099 } 100 101 /** 102 * Collect the amount of region cached for all the regions from all the active region servers. 103 */ 104 private void updateRegionLoad() { 105 loads = new HashMap<>(); 106 regionCacheRatioOnOldServerMap = new HashMap<>(); 107 Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>(); 108 109 // Build current region cache statistics 110 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 111 // Create a map of region and the server where it is currently hosted 112 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 113 String regionEncodedName = RegionInfo.encodeRegionName(regionName); 114 115 Deque<BalancerRegionLoad> rload = new ArrayDeque<>(); 116 117 // Get the total size of the hFiles in this region 118 int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); 119 120 rload.add(new BalancerRegionLoad(rm)); 121 // Maintain a map of region and it's total size. This is needed to calculate the cache 122 // ratios for the regions cached on old region servers 123 regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); 124 loads.put(regionEncodedName, rload); 125 }); 126 }); 127 128 // Build cache statistics for the regions hosted previously on old region servers 129 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 130 // Find if a region was previously hosted on a server other than the one it is currently 131 // hosted on. 132 sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { 133 // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on 134 // this server 135 if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { 136 ServerName currentServer = 137 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); 138 if (!ServerName.isSameAddress(currentServer, sn)) { 139 int regionSizeMB = 140 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); 141 float regionCacheRatioOnOldServer = 142 regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; 143 regionCacheRatioOnOldServerMap.put(regionEncodedName, 144 new Pair<>(sn, regionCacheRatioOnOldServer)); 145 } 146 } 147 }); 148 }); 149 } 150 151 private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { 152 Optional<RegionInfo> regionInfoOptional = 153 Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { 154 return regionName.equals(ri.getEncodedName()); 155 }).findFirst(); 156 157 if (regionInfoOptional.isPresent()) { 158 return regionInfoOptional.get(); 159 } 160 return null; 161 } 162 163 private class CacheAwareCandidateGenerator extends CandidateGenerator { 164 @Override 165 protected BalanceAction generate(BalancerClusterState cluster) { 166 // Move the regions to the servers they were previously hosted on based on the cache ratio 167 if ( 168 !regionCacheRatioOnOldServerMap.isEmpty() 169 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 170 ) { 171 Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap = 172 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 173 // Get the server where this region was previously hosted 174 String regionEncodedName = regionCacheRatioServerMap.getKey(); 175 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 176 if (regionInfo == null) { 177 LOG.warn("Region {} not found", regionEncodedName); 178 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 179 return BalanceAction.NULL_ACTION; 180 } 181 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 182 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 183 return BalanceAction.NULL_ACTION; 184 } 185 int regionIndex = cluster.regionsToIndex.get(regionInfo); 186 int oldServerIndex = cluster.serversToIndex 187 .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); 188 if (oldServerIndex < 0) { 189 LOG.warn("Server previously hosting region {} not found", regionEncodedName); 190 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 191 return BalanceAction.NULL_ACTION; 192 } 193 194 float oldRegionCacheRatio = 195 cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); 196 int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 197 float currentRegionCacheRatio = 198 cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); 199 200 BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, 201 currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); 202 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 203 return action; 204 } 205 return BalanceAction.NULL_ACTION; 206 } 207 208 private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, 209 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 210 float cacheRatioOnOldServer) { 211 return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, 212 cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) 213 ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) 214 : BalanceAction.NULL_ACTION; 215 } 216 217 private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, 218 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 219 float cacheRatioOnOldServer) { 220 // Find if the region has already moved by comparing the current server index with the 221 // current server index. This can happen when other candidate generator has moved the region 222 if (currentServerIndex < 0 || oldServerIndex < 0) { 223 return false; 224 } 225 226 DecimalFormat df = new DecimalFormat("#"); 227 df.setMaximumFractionDigits(4); 228 229 float cacheRatioDiffThreshold = 0.6f; 230 231 // Conditions for moving the region 232 233 // If the region is fully cached on the old server, move the region back 234 if (cacheRatioOnOldServer == 1.0f) { 235 if (LOG.isDebugEnabled()) { 236 LOG.debug("Region {} moved to the old server {} as it is fully cached there", 237 cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); 238 } 239 return true; 240 } 241 242 // Move the region back to the old server if it is cached equally on both the servers 243 if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { 244 if (LOG.isDebugEnabled()) { 245 LOG.debug( 246 "Region {} moved from {} to {} as the region is cached {} equally on both servers", 247 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 248 cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer)); 249 } 250 return true; 251 } 252 253 // If the region is not fully cached on either of the servers, move the region back to the 254 // old server if the region cache ratio on the current server is still much less than the old 255 // server 256 if ( 257 cacheRatioOnOldServer > 0.0f 258 && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold 259 ) { 260 if (LOG.isDebugEnabled()) { 261 LOG.debug( 262 "Region {} moved from {} to {} as region cache ratio {} is better than the current " 263 + "cache ratio {}", 264 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 265 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, 266 df.format(cacheRatioOnCurrentServer)); 267 } 268 return true; 269 } 270 271 if (LOG.isDebugEnabled()) { 272 LOG.debug( 273 "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", 274 cluster.regions[regionIndex], cluster.servers[currentServerIndex], 275 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, 276 df.format(cacheRatioOnCurrentServer)); 277 } 278 return false; 279 } 280 } 281 282 private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { 283 @Override 284 BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { 285 // First move all the regions which were hosted previously on some other server back to their 286 // old servers 287 if ( 288 !regionCacheRatioOnOldServerMap.isEmpty() 289 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 290 ) { 291 // Get the first region index in the historical cache ratio list 292 Map.Entry<String, Pair<ServerName, Float>> regionEntry = 293 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 294 String regionEncodedName = regionEntry.getKey(); 295 296 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 297 if (regionInfo == null) { 298 LOG.warn("Region {} does not exist", regionEncodedName); 299 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 300 return BalanceAction.NULL_ACTION; 301 } 302 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 303 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 304 return BalanceAction.NULL_ACTION; 305 } 306 307 int regionIndex = cluster.regionsToIndex.get(regionInfo); 308 309 // Get the current host name for this region 310 thisServer = cluster.regionIndexToServerIndex[regionIndex]; 311 312 // Get the old server index 313 otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); 314 315 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 316 317 if (otherServer < 0) { 318 // The old server has been moved to other host and hence, the region cannot be moved back 319 // to the old server 320 if (LOG.isDebugEnabled()) { 321 LOG.debug( 322 "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " 323 + "server {} as the server does not exist", 324 regionEncodedName, regionEntry.getValue().getFirst().getHostname()); 325 } 326 return BalanceAction.NULL_ACTION; 327 } 328 329 if (LOG.isDebugEnabled()) { 330 LOG.debug( 331 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " 332 + "was hosted their earlier", 333 regionEncodedName, cluster.servers[thisServer].getHostname(), 334 cluster.servers[otherServer].getHostname()); 335 } 336 337 return getAction(thisServer, regionIndex, otherServer, -1); 338 } 339 340 if (thisServer < 0 || otherServer < 0) { 341 return BalanceAction.NULL_ACTION; 342 } 343 344 int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); 345 if (regionIndexToMove < 0) { 346 if (LOG.isDebugEnabled()) { 347 LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); 348 } 349 return BalanceAction.NULL_ACTION; 350 } 351 if (LOG.isDebugEnabled()) { 352 LOG.debug( 353 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " 354 + "least cached on current server", 355 cluster.regions[regionIndexToMove].getEncodedName(), 356 cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); 357 } 358 return getAction(thisServer, regionIndexToMove, otherServer, -1); 359 } 360 361 private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { 362 float minCacheRatio = Float.MAX_VALUE; 363 int leastCachedRegion = -1; 364 for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { 365 int regionIndex = cluster.regionsPerServer[thisServer][i]; 366 367 float cacheRatioOnCurrentServer = 368 cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); 369 if (cacheRatioOnCurrentServer < minCacheRatio) { 370 minCacheRatio = cacheRatioOnCurrentServer; 371 leastCachedRegion = regionIndex; 372 } 373 } 374 return leastCachedRegion; 375 } 376 } 377 378 static class CacheAwareRegionSkewnessCostFunction extends CostFunction { 379 static final String REGION_COUNT_SKEW_COST_KEY = 380 "hbase.master.balancer.stochastic.regionCountCost"; 381 static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; 382 private final DoubleArrayCost cost = new DoubleArrayCost(); 383 384 CacheAwareRegionSkewnessCostFunction(Configuration conf) { 385 // Load multiplier should be the greatest as it is the most general way to balance data. 386 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 387 } 388 389 @Override 390 void prepare(BalancerClusterState cluster) { 391 super.prepare(cluster); 392 cost.prepare(cluster.numServers); 393 cost.applyCostsChange(costs -> { 394 for (int i = 0; i < cluster.numServers; i++) { 395 costs[i] = cluster.regionsPerServer[i].length; 396 } 397 }); 398 } 399 400 @Override 401 protected double cost() { 402 return cost.cost(); 403 } 404 405 @Override 406 protected void regionMoved(int region, int oldServer, int newServer) { 407 cost.applyCostsChange(costs -> { 408 costs[oldServer] = cluster.regionsPerServer[oldServer].length; 409 costs[newServer] = cluster.regionsPerServer[newServer].length; 410 }); 411 } 412 413 @Override 414 public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) { 415 weights.merge(LoadCandidateGenerator.class, cost(), Double::sum); 416 } 417 } 418 419 static class CacheAwareCostFunction extends CostFunction { 420 private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; 421 private double cacheRatio; 422 private double bestCacheRatio; 423 424 private static final float DEFAULT_CACHE_COST = 20; 425 426 CacheAwareCostFunction(Configuration conf) { 427 boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; 428 // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled 429 this.setMultiplier( 430 !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); 431 bestCacheRatio = 0.0; 432 cacheRatio = 0.0; 433 } 434 435 @Override 436 void prepare(BalancerClusterState cluster) { 437 super.prepare(cluster); 438 cacheRatio = 0.0; 439 bestCacheRatio = 0.0; 440 441 for (int region = 0; region < cluster.numRegions; region++) { 442 cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 443 cluster.regionIndexToServerIndex[region]); 444 bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 445 getServerWithBestCacheRatioForRegion(region)); 446 } 447 448 cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; 449 if (LOG.isDebugEnabled()) { 450 LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); 451 } 452 } 453 454 @Override 455 protected double cost() { 456 return scale(0, 1, 1 - cacheRatio); 457 } 458 459 @Override 460 protected void regionMoved(int region, int oldServer, int newServer) { 461 double regionCacheRatioOnOldServer = 462 cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); 463 double regionCacheRatioOnNewServer = 464 cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); 465 double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; 466 double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; 467 cacheRatio += normalizedDelta; 468 if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { 469 LOG.debug( 470 "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" 471 + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", 472 cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), 473 cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, 474 regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); 475 } 476 } 477 478 private int getServerWithBestCacheRatioForRegion(int region) { 479 return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; 480 } 481 482 @Override 483 public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) { 484 weights.merge(LoadCandidateGenerator.class, cost(), Double::sum); 485 } 486 } 487}