001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.TreeMap; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.Future; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.ClusterMetrics; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerMetrics; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.master.RegionPlan; 047import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 048import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Pair; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.junit.jupiter.api.Timeout; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Tag(LargeTests.TAG) 060public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal 061 extends RSGroupableBalancerTestBase { 062 063 private static final Logger LOG = 064 LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class); 065 066 private static RSGroupBasedLoadBalancer loadBalancer; 067 068 @BeforeAll 069 public static void beforeAllTests() throws Exception { 070 groups = new String[] { RSGroupInfo.DEFAULT_GROUP }; 071 servers = generateServers(3); 072 groupMap = constructGroupInfo(servers, groups); 073 tableDescs = constructTableDesc(false); 074 Configuration cong = HBaseConfiguration.create(); 075 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 076 conf.set("hbase.rsgroup.grouploadbalancer.class", 077 CacheAwareLoadBalancer.class.getCanonicalName()); 078 loadBalancer = new RSGroupBasedLoadBalancer(); 079 loadBalancer.setMasterServices(getMockedMaster()); 080 loadBalancer.initialize(); 081 } 082 083 @Test 084 public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { 085 // The regions are not cached on old server as well as the current server. This causes 086 // skewness in the region allocation which should be fixed by the balancer 087 088 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 089 ServerName server0 = servers.get(0); 090 ServerName server1 = servers.get(1); 091 ServerName server2 = servers.get(2); 092 093 // Simulate that the regions previously hosted by server1 are now hosted on server0 094 List<RegionInfo> regionsOnServer0 = randomRegions(10); 095 List<RegionInfo> regionsOnServer1 = randomRegions(0); 096 List<RegionInfo> regionsOnServer2 = randomRegions(5); 097 098 clusterState.put(server0, regionsOnServer0); 099 clusterState.put(server1, regionsOnServer1); 100 clusterState.put(server2, regionsOnServer2); 101 102 // Mock cluster metrics 103 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 104 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 105 0.0f, new ArrayList<>(), 0, 10)); 106 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 107 0.0f, new ArrayList<>(), 0, 10)); 108 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 109 0.0f, new ArrayList<>(), 0, 10)); 110 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 111 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 112 loadBalancer.updateClusterMetrics(clusterMetrics); 113 114 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 115 (Map) mockClusterServersWithTables(clusterState); 116 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 117 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 118 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 119 for (RegionPlan plan : plans) { 120 if (plan.getSource().equals(server0)) { 121 regionsMovedFromServer0.add(plan.getRegionInfo()); 122 if (!targetServers.containsKey(plan.getDestination())) { 123 targetServers.put(plan.getDestination(), new ArrayList<>()); 124 } 125 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 126 } 127 } 128 // should move 5 regions from server0 to server 1 129 assertEquals(5, regionsMovedFromServer0.size()); 130 assertEquals(5, targetServers.get(server1).size()); 131 } 132 133 /** 134 * Regions on the overloaded RS report low block-cache ratio; no RS reports prefetch/historical 135 * cache for those regions (so {@link CacheAwareLoadBalancer.CacheAwareCandidateGenerator} has no 136 * "old server" to prefer). Another RS has ample free block cache. The balancer should still emit 137 * plans that shed load from the hot RS onto the idle RS with spare cache capacity. 138 */ 139 @Test 140 public void testLowCacheRatioNoHistoricalCacheRelocatesWhenTargetHasFreeBlockCache() 141 throws Exception { 142 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 143 ServerName server0 = servers.get(0); 144 ServerName server1 = servers.get(1); 145 ServerName server2 = servers.get(2); 146 147 List<RegionInfo> regionsOnServer0 = randomRegions(10); 148 List<RegionInfo> regionsOnServer1 = randomRegions(0); 149 List<RegionInfo> regionsOnServer2 = randomRegions(5); 150 151 clusterState.put(server0, regionsOnServer0); 152 clusterState.put(server1, regionsOnServer1); 153 clusterState.put(server2, regionsOnServer2); 154 155 // Below LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT (0.35); 156 ServerMetrics sm0 = mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 0.1f, 157 new ArrayList<>(), 0, 10); 158 when(sm0.getCacheFreeSize()).thenReturn(0L); 159 ServerMetrics sm1 = mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 0.0f, 160 new ArrayList<>(), 0, 10); 161 // Simulates 1GB free cache space on server1 162 when(sm1.getCacheFreeSize()).thenReturn(1024L * 1024 * 1024); 163 ServerMetrics sm2 = mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 1.0f, 164 new ArrayList<>(), 0, 10); 165 when(sm2.getCacheFreeSize()).thenReturn(0L); 166 167 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 168 serverMetricsMap.put(server0, sm0); 169 serverMetricsMap.put(server1, sm1); 170 serverMetricsMap.put(server2, sm2); 171 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 172 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 173 loadBalancer.updateClusterMetrics(clusterMetrics); 174 175 CacheAwareLoadBalancer internalBalancer = 176 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 177 assertNotNull(internalBalancer); 178 assertTrue(internalBalancer.regionCacheRatioOnOldServerMap.isEmpty()); 179 180 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable = 181 (Map) mockClusterServersWithTables(clusterState); 182 List<RegionPlan> plans = loadBalancer.balanceCluster(loadOfAllTable); 183 assertNotNull(plans); 184 185 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 186 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 187 for (RegionPlan plan : plans) { 188 if (plan.getSource().equals(server0)) { 189 regionsMovedFromServer0.add(plan.getRegionInfo()); 190 if (!targetServers.containsKey(plan.getDestination())) { 191 targetServers.put(plan.getDestination(), new ArrayList<>()); 192 } 193 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 194 } 195 } 196 assertEquals(5, regionsMovedFromServer0.size()); 197 assertNotNull(targetServers.get(server1)); 198 assertEquals(5, targetServers.get(server1).size()); 199 } 200 201 @Test 202 public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { 203 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 204 ServerName server0 = servers.get(0); 205 ServerName server1 = servers.get(1); 206 ServerName server2 = servers.get(2); 207 208 // Simulate that the regions previously hosted by server1 are now hosted on server0 209 List<RegionInfo> regionsOnServer0 = randomRegions(10); 210 List<RegionInfo> regionsOnServer1 = randomRegions(0); 211 List<RegionInfo> regionsOnServer2 = randomRegions(5); 212 213 clusterState.put(server0, regionsOnServer0); 214 clusterState.put(server1, regionsOnServer1); 215 clusterState.put(server2, regionsOnServer2); 216 217 // Mock cluster metrics 218 219 // Mock 5 regions from server0 were previously hosted on server1 220 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size()); 221 222 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 223 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 224 0.0f, new ArrayList<>(), 0, 10)); 225 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 226 0.0f, oldCachedRegions, 6, 10)); 227 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 228 0.0f, new ArrayList<>(), 0, 10)); 229 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 230 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 231 loadBalancer.updateClusterMetrics(clusterMetrics); 232 233 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 234 (Map) mockClusterServersWithTables(clusterState); 235 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 236 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 237 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 238 for (RegionPlan plan : plans) { 239 if (plan.getSource().equals(server0)) { 240 regionsMovedFromServer0.add(plan.getRegionInfo()); 241 if (!targetServers.containsKey(plan.getDestination())) { 242 targetServers.put(plan.getDestination(), new ArrayList<>()); 243 } 244 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 245 } 246 } 247 // should move 5 regions from server0 to server1 248 assertEquals(5, regionsMovedFromServer0.size()); 249 assertEquals(5, targetServers.get(server1).size()); 250 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 251 } 252 253 @Test 254 public void testThrottlingRegionBeyondThreshold() throws Exception { 255 Configuration conf = HBaseConfiguration.create(); 256 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 257 conf.set("hbase.rsgroup.grouploadbalancer.class", 258 CacheAwareLoadBalancer.class.getCanonicalName()); 259 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 260 balancer.setMasterServices(getMockedMaster()); 261 balancer.initialize(); 262 263 ServerName server0 = servers.get(0); 264 ServerName server1 = servers.get(1); 265 Pair<ServerName, Float> regionRatio = new Pair<>(); 266 regionRatio.setFirst(server0); 267 regionRatio.setSecond(1.0f); 268 CacheAwareLoadBalancer internalBalancer = 269 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 270 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 271 RegionInfo mockedInfo = mock(RegionInfo.class); 272 when(mockedInfo.getEncodedName()).thenReturn("region1"); 273 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 274 long startTime = EnvironmentEdgeManager.currentTime(); 275 synchronized (balancer) { 276 balancer.throttle(plan); 277 } 278 long endTime = EnvironmentEdgeManager.currentTime(); 279 assertTrue((endTime - startTime) < 10); 280 } 281 282 @Test 283 public void testThrottlingRegionBelowThreshold() throws Exception { 284 Configuration conf = HBaseConfiguration.create(); 285 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 286 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 287 conf.set("hbase.rsgroup.grouploadbalancer.class", 288 CacheAwareLoadBalancer.class.getCanonicalName()); 289 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 290 loadBalancer.setMasterServices(getMockedMaster()); 291 loadBalancer.initialize(); 292 CacheAwareLoadBalancer internalBalancer = 293 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 294 internalBalancer.loadConf(conf); 295 296 ServerName server0 = servers.get(0); 297 ServerName server1 = servers.get(1); 298 Pair<ServerName, Float> regionRatio = new Pair<>(); 299 regionRatio.setFirst(server0); 300 regionRatio.setSecond(0.1f); 301 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 302 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 303 RegionInfo mockedInfo = mock(RegionInfo.class); 304 when(mockedInfo.getEncodedName()).thenReturn("region1"); 305 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 306 long startTime = EnvironmentEdgeManager.currentTime(); 307 synchronized (loadBalancer) { 308 loadBalancer.throttle(plan); 309 } 310 long endTime = EnvironmentEdgeManager.currentTime(); 311 assertTrue((endTime - startTime) >= 100); 312 } 313 314 @Test 315 public void testThrottlingCacheRatioUnknownOnTarget() throws Exception { 316 Configuration conf = HBaseConfiguration.create(); 317 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 318 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 319 conf.set("hbase.rsgroup.grouploadbalancer.class", 320 CacheAwareLoadBalancer.class.getCanonicalName()); 321 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 322 loadBalancer.setMasterServices(getMockedMaster()); 323 loadBalancer.initialize(); 324 CacheAwareLoadBalancer internalBalancer = 325 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 326 internalBalancer.loadConf(conf); 327 328 ServerName server0 = servers.get(0); 329 ServerName server1 = servers.get(1); 330 ServerName server3 = servers.get(2); 331 // setting region cache ratio 100% on server 3, though this is not the target in the region plan 332 Pair<ServerName, Float> regionRatio = new Pair<>(); 333 regionRatio.setFirst(server3); 334 regionRatio.setSecond(1.0f); 335 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 336 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 337 RegionInfo mockedInfo = mock(RegionInfo.class); 338 when(mockedInfo.getEncodedName()).thenReturn("region1"); 339 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 340 long startTime = EnvironmentEdgeManager.currentTime(); 341 synchronized (loadBalancer) { 342 loadBalancer.throttle(plan); 343 } 344 long endTime = EnvironmentEdgeManager.currentTime(); 345 assertTrue((endTime - startTime) >= 100); 346 } 347 348 @Test 349 public void testThrottlingCacheRatioUnknownForRegion() throws Exception { 350 Configuration conf = HBaseConfiguration.create(); 351 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 352 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 353 conf.set("hbase.rsgroup.grouploadbalancer.class", 354 CacheAwareLoadBalancer.class.getCanonicalName()); 355 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 356 loadBalancer.setMasterServices(getMockedMaster()); 357 loadBalancer.initialize(); 358 CacheAwareLoadBalancer internalBalancer = 359 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 360 internalBalancer.loadConf(conf); 361 362 ServerName server0 = servers.get(0); 363 ServerName server1 = servers.get(1); 364 ServerName server3 = servers.get(2); 365 // No cache ratio available for region1 366 RegionInfo mockedInfo = mock(RegionInfo.class); 367 when(mockedInfo.getEncodedName()).thenReturn("region1"); 368 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 369 long startTime = EnvironmentEdgeManager.currentTime(); 370 synchronized (loadBalancer) { 371 loadBalancer.throttle(plan); 372 } 373 long endTime = EnvironmentEdgeManager.currentTime(); 374 assertTrue((endTime - startTime) >= 100); 375 } 376 377 @Test 378 public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception { 379 // The regions are fully cached on old server 380 381 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 382 ServerName server0 = servers.get(0); 383 ServerName server1 = servers.get(1); 384 ServerName server2 = servers.get(2); 385 386 // Simulate on RS with all regions, and two RSes with no regions 387 List<RegionInfo> regionsOnServer0 = randomRegions(15); 388 List<RegionInfo> regionsOnServer1 = randomRegions(0); 389 List<RegionInfo> regionsOnServer2 = randomRegions(0); 390 391 clusterState.put(server0, regionsOnServer0); 392 clusterState.put(server1, regionsOnServer1); 393 clusterState.put(server2, regionsOnServer2); 394 395 // Mock cluster metrics 396 // Mock 5 regions from server0 were previously hosted on server1 397 List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10); 398 List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size()); 399 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 400 // mock server metrics to set cache ratio as 0 in the RS 0 401 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 402 0.0f, new ArrayList<>(), 0, 10)); 403 // mock server metrics to set cache ratio as 1 in the RS 1 404 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 405 0.0f, oldCachedRegions1, 10, 10)); 406 // mock server metrics to set cache ratio as .8 in the RS 2 407 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 408 0.0f, oldCachedRegions2, 8, 10)); 409 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 410 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 411 loadBalancer.updateClusterMetrics(clusterMetrics); 412 413 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 414 (Map) mockClusterServersWithTables(clusterState); 415 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 416 LOG.debug("plans size: {}", plans.size()); 417 LOG.debug("plans: {}", plans); 418 LOG.debug("server1 name: {}", server1.getServerName()); 419 // assert the plans are in descending order from the most cached to the least cached 420 int highCacheCount = 0; 421 for (RegionPlan plan : plans) { 422 LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(), 423 plan.getDestination().getServerName()); 424 if (highCacheCount < 5) { 425 LOG.debug("Count: {}", highCacheCount); 426 assertTrue(oldCachedRegions1.contains(plan.getRegionInfo())); 427 assertFalse(oldCachedRegions2.contains(plan.getRegionInfo())); 428 highCacheCount++; 429 } else { 430 assertTrue(oldCachedRegions2.contains(plan.getRegionInfo())); 431 assertFalse(oldCachedRegions1.contains(plan.getRegionInfo())); 432 } 433 } 434 } 435 436 @Test 437 public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { 438 // The regions are fully cached on old server 439 440 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 441 ServerName server0 = servers.get(0); 442 ServerName server1 = servers.get(1); 443 ServerName server2 = servers.get(2); 444 445 // Simulate that the regions previously hosted by server1 are now hosted on server0 446 List<RegionInfo> regionsOnServer0 = randomRegions(10); 447 List<RegionInfo> regionsOnServer1 = randomRegions(0); 448 List<RegionInfo> regionsOnServer2 = randomRegions(5); 449 450 clusterState.put(server0, regionsOnServer0); 451 clusterState.put(server1, regionsOnServer1); 452 clusterState.put(server2, regionsOnServer2); 453 454 // Mock cluster metrics 455 456 // Mock 5 regions from server0 were previously hosted on server1 457 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size()); 458 459 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 460 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 461 0.0f, new ArrayList<>(), 0, 10)); 462 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 463 0.0f, oldCachedRegions, 10, 10)); 464 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 465 0.0f, new ArrayList<>(), 0, 10)); 466 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 467 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 468 loadBalancer.updateClusterMetrics(clusterMetrics); 469 470 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 471 (Map) mockClusterServersWithTables(clusterState); 472 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 473 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 474 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 475 for (RegionPlan plan : plans) { 476 if (plan.getSource().equals(server0)) { 477 regionsMovedFromServer0.add(plan.getRegionInfo()); 478 if (!targetServers.containsKey(plan.getDestination())) { 479 targetServers.put(plan.getDestination(), new ArrayList<>()); 480 } 481 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 482 } 483 } 484 // should move 5 regions from server0 to server1 485 assertEquals(5, regionsMovedFromServer0.size()); 486 assertEquals(5, targetServers.get(server1).size()); 487 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 488 } 489 490 @Test 491 public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { 492 // The regions are fully cached on old server 493 494 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 495 ServerName server0 = servers.get(0); 496 ServerName server1 = servers.get(1); 497 ServerName server2 = servers.get(2); 498 499 // Simulate that the regions previously hosted by server1 are now hosted on server0 500 List<RegionInfo> regionsOnServer0 = randomRegions(10); 501 List<RegionInfo> regionsOnServer1 = randomRegions(0); 502 List<RegionInfo> regionsOnServer2 = randomRegions(5); 503 504 clusterState.put(server0, regionsOnServer0); 505 clusterState.put(server1, regionsOnServer1); 506 clusterState.put(server2, regionsOnServer2); 507 508 // Mock cluster metrics 509 510 // Mock 5 regions from server0 were previously hosted on server1 511 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size()); 512 513 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 514 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 515 1.0f, new ArrayList<>(), 0, 10)); 516 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 517 1.0f, oldCachedRegions, 10, 10)); 518 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 519 1.0f, new ArrayList<>(), 0, 10)); 520 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 521 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 522 loadBalancer.updateClusterMetrics(clusterMetrics); 523 524 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 525 (Map) mockClusterServersWithTables(clusterState); 526 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 527 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 528 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 529 for (RegionPlan plan : plans) { 530 if (plan.getSource().equals(server0)) { 531 regionsMovedFromServer0.add(plan.getRegionInfo()); 532 if (!targetServers.containsKey(plan.getDestination())) { 533 targetServers.put(plan.getDestination(), new ArrayList<>()); 534 } 535 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 536 } 537 } 538 // should move 5 regions from server0 to server1 539 assertEquals(5, regionsMovedFromServer0.size()); 540 assertEquals(5, targetServers.get(server1).size()); 541 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 542 } 543 544 @Test 545 public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { 546 // The regions are partially cached on old server 547 548 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 549 ServerName server0 = servers.get(0); 550 ServerName server1 = servers.get(1); 551 ServerName server2 = servers.get(2); 552 553 // Simulate that the regions previously hosted by server1 are now hosted on server0 554 List<RegionInfo> regionsOnServer0 = randomRegions(10); 555 List<RegionInfo> regionsOnServer1 = randomRegions(0); 556 List<RegionInfo> regionsOnServer2 = randomRegions(5); 557 558 clusterState.put(server0, regionsOnServer0); 559 clusterState.put(server1, regionsOnServer1); 560 clusterState.put(server2, regionsOnServer2); 561 562 // Mock cluster metrics 563 564 // Mock 5 regions from server0 were previously hosted on server1 565 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size()); 566 567 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 568 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 569 0.2f, new ArrayList<>(), 0, 10)); 570 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 571 0.0f, oldCachedRegions, 6, 10)); 572 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 573 1.0f, new ArrayList<>(), 0, 10)); 574 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 575 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 576 loadBalancer.updateClusterMetrics(clusterMetrics); 577 578 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 579 (Map) mockClusterServersWithTables(clusterState); 580 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 581 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 582 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 583 for (RegionPlan plan : plans) { 584 if (plan.getSource().equals(server0)) { 585 regionsMovedFromServer0.add(plan.getRegionInfo()); 586 if (!targetServers.containsKey(plan.getDestination())) { 587 targetServers.put(plan.getDestination(), new ArrayList<>()); 588 } 589 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 590 } 591 } 592 assertEquals(5, regionsMovedFromServer0.size()); 593 assertEquals(5, targetServers.get(server1).size()); 594 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 595 } 596 597 @Timeout(60) 598 @Test 599 public void testConfigUpdateDuringBalance() throws Exception { 600 float expectedOldRatioThreshold = 0.8f; 601 float expectedNewRatioThreshold = 0.95f; 602 long throttlingTimeMillis = 10000; 603 604 conf = HBaseConfiguration.create(); 605 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis); 606 conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold); 607 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 608 conf.set("hbase.rsgroup.grouploadbalancer.class", 609 CacheAwareLoadBalancer.class.getCanonicalName()); 610 611 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 612 balancer.setMasterServices(getMockedMaster()); 613 balancer.initialize(); 614 615 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 616 ServerName server0 = servers.get(0); 617 ServerName server1 = servers.get(1); 618 ServerName server2 = servers.get(2); 619 620 // Setup cluster: all 3 regions on server0 (unbalanced) 621 List<RegionInfo> regionsOnServer0 = randomRegions(3); 622 List<RegionInfo> regionsOnServer1 = randomRegions(0); 623 List<RegionInfo> regionsOnServer2 = randomRegions(0); 624 625 clusterState.put(server0, regionsOnServer0); 626 clusterState.put(server1, regionsOnServer1); 627 clusterState.put(server2, regionsOnServer2); 628 629 // Mock metrics: NO cache info for any region = all will be throttled 630 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 631 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 632 0.0f, new ArrayList<>(), 0, 10)); 633 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 634 0.0f, new ArrayList<>(), 0, 10)); 635 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 636 0.0f, new ArrayList<>(), 0, 10)); 637 638 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 639 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 640 balancer.updateClusterMetrics(clusterMetrics); 641 642 final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable = 643 (Map) mockClusterServersWithTables(clusterState); 644 645 // Verify initial configuration is set correctly 646 CacheAwareLoadBalancer internalBalancer = 647 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 648 assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 649 650 CountDownLatch balanceStarted = new CountDownLatch(1); 651 CountDownLatch configUpdateInitiated = new CountDownLatch(1); 652 long[] configUpdateDuration = new long[1]; 653 long[] balanceDuration = new long[1]; 654 655 // Actual old ratio threshold used during balance 656 float[] actualOldRatioThresholdDuringBalance = new float[1]; 657 658 ExecutorService executor = Executors.newFixedThreadPool(2); 659 660 try { 661 // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for 662 // the duration of balance 663 Future<Long> balanceFuture = executor.submit(() -> { 664 try { 665 long start = EnvironmentEdgeManager.currentTime(); 666 synchronized (balancer) { 667 try { 668 // Simulate beginning of HMaster.balance() mark balancing window open 669 balancer.onBalancingStart(); 670 balanceStarted.countDown(); 671 672 List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable); 673 if (plans != null) { 674 for (RegionPlan plan : plans) { 675 balancer.throttle(plan); 676 } 677 } 678 // Wait until config update is initiated while balance is still in progress 679 configUpdateInitiated.await(); 680 681 // Old config should still be visible during current balance run 682 CacheAwareLoadBalancer currentInternal = 683 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 684 actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold; 685 686 } finally { 687 balancer.onBalancingComplete(); 688 } 689 } 690 return EnvironmentEdgeManager.currentTime() - start; 691 } catch (Exception e) { 692 throw new RuntimeException(e); 693 } 694 }); 695 696 // Thread 2: Simulate update_all_config / onConfigurationChange 697 Future<Long> configUpdateFuture = executor.submit(() -> { 698 try { 699 long startTime = System.currentTimeMillis(); 700 // Wait for balance to start 701 balanceStarted.await(); 702 703 // Call onConfigurationChange - should NOT hang 704 Configuration newConf = new Configuration(conf); 705 newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 706 newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000); 707 newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold); 708 balancer.onConfigurationChange(newConf); 709 configUpdateInitiated.countDown(); 710 711 return System.currentTimeMillis() - startTime; 712 } catch (Exception e) { 713 throw new RuntimeException(e); 714 } 715 }); 716 717 // Wait for both threads to complete 718 configUpdateDuration[0] = configUpdateFuture.get(); 719 balanceDuration[0] = balanceFuture.get(); 720 721 // Verify that config update didn't hang/timeout waiting for balance 722 assertTrue(configUpdateDuration[0] < balanceDuration[0]); 723 724 // Verify that ratio threshold used during balance is stll the old 725 assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f); 726 727 // Verify that config updated successfully after balance completed 728 internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 729 assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 730 731 } finally { 732 executor.shutdownNow(); 733 } 734 } 735}