001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeMap; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterMetrics; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ServerMetrics; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.master.RegionPlan; 046import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 047import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.Pair; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054import org.junit.jupiter.api.Timeout; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058@Tag(LargeTests.TAG) 059public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal 060 extends RSGroupableBalancerTestBase { 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class); 064 065 private static RSGroupBasedLoadBalancer loadBalancer; 066 067 @BeforeAll 068 public static void beforeAllTests() throws Exception { 069 groups = new String[] { RSGroupInfo.DEFAULT_GROUP }; 070 servers = generateServers(3); 071 groupMap = constructGroupInfo(servers, groups); 072 tableDescs = constructTableDesc(false); 073 Configuration cong = HBaseConfiguration.create(); 074 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 075 conf.set("hbase.rsgroup.grouploadbalancer.class", 076 CacheAwareLoadBalancer.class.getCanonicalName()); 077 loadBalancer = new RSGroupBasedLoadBalancer(); 078 loadBalancer.setMasterServices(getMockedMaster()); 079 loadBalancer.initialize(); 080 } 081 082 @Test 083 public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { 084 // The regions are not cached on old server as well as the current server. This causes 085 // skewness in the region allocation which should be fixed by the balancer 086 087 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 088 ServerName server0 = servers.get(0); 089 ServerName server1 = servers.get(1); 090 ServerName server2 = servers.get(2); 091 092 // Simulate that the regions previously hosted by server1 are now hosted on server0 093 List<RegionInfo> regionsOnServer0 = randomRegions(10); 094 List<RegionInfo> regionsOnServer1 = randomRegions(0); 095 List<RegionInfo> regionsOnServer2 = randomRegions(5); 096 097 clusterState.put(server0, regionsOnServer0); 098 clusterState.put(server1, regionsOnServer1); 099 clusterState.put(server2, regionsOnServer2); 100 101 // Mock cluster metrics 102 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 103 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 104 0.0f, new ArrayList<>(), 0, 10)); 105 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 106 0.0f, new ArrayList<>(), 0, 10)); 107 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 108 0.0f, new ArrayList<>(), 0, 10)); 109 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 110 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 111 loadBalancer.updateClusterMetrics(clusterMetrics); 112 113 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 114 (Map) mockClusterServersWithTables(clusterState); 115 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 116 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 117 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 118 for (RegionPlan plan : plans) { 119 if (plan.getSource().equals(server0)) { 120 regionsMovedFromServer0.add(plan.getRegionInfo()); 121 if (!targetServers.containsKey(plan.getDestination())) { 122 targetServers.put(plan.getDestination(), new ArrayList<>()); 123 } 124 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 125 } 126 } 127 // should move 5 regions from server0 to server 1 128 assertEquals(5, regionsMovedFromServer0.size()); 129 assertEquals(5, targetServers.get(server1).size()); 130 } 131 132 @Test 133 public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { 134 // The regions are partially cached on old server but not cached on the current server 135 136 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 137 ServerName server0 = servers.get(0); 138 ServerName server1 = servers.get(1); 139 ServerName server2 = servers.get(2); 140 141 // Simulate that the regions previously hosted by server1 are now hosted on server0 142 List<RegionInfo> regionsOnServer0 = randomRegions(10); 143 List<RegionInfo> regionsOnServer1 = randomRegions(0); 144 List<RegionInfo> regionsOnServer2 = randomRegions(5); 145 146 clusterState.put(server0, regionsOnServer0); 147 clusterState.put(server1, regionsOnServer1); 148 clusterState.put(server2, regionsOnServer2); 149 150 // Mock cluster metrics 151 152 // Mock 5 regions from server0 were previously hosted on server1 153 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 154 155 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 156 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 157 0.0f, new ArrayList<>(), 0, 10)); 158 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 159 0.0f, oldCachedRegions, 6, 10)); 160 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 161 0.0f, new ArrayList<>(), 0, 10)); 162 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 163 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 164 loadBalancer.updateClusterMetrics(clusterMetrics); 165 166 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 167 (Map) mockClusterServersWithTables(clusterState); 168 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 169 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 170 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 171 for (RegionPlan plan : plans) { 172 if (plan.getSource().equals(server0)) { 173 regionsMovedFromServer0.add(plan.getRegionInfo()); 174 if (!targetServers.containsKey(plan.getDestination())) { 175 targetServers.put(plan.getDestination(), new ArrayList<>()); 176 } 177 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 178 } 179 } 180 // should move 5 regions from server0 to server1 181 assertEquals(5, regionsMovedFromServer0.size()); 182 assertEquals(5, targetServers.get(server1).size()); 183 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 184 } 185 186 @Test 187 public void testThrottlingRegionBeyondThreshold() throws Exception { 188 Configuration conf = HBaseConfiguration.create(); 189 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 190 conf.set("hbase.rsgroup.grouploadbalancer.class", 191 CacheAwareLoadBalancer.class.getCanonicalName()); 192 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 193 balancer.setMasterServices(getMockedMaster()); 194 balancer.initialize(); 195 196 ServerName server0 = servers.get(0); 197 ServerName server1 = servers.get(1); 198 Pair<ServerName, Float> regionRatio = new Pair<>(); 199 regionRatio.setFirst(server0); 200 regionRatio.setSecond(1.0f); 201 CacheAwareLoadBalancer internalBalancer = 202 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 203 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 204 RegionInfo mockedInfo = mock(RegionInfo.class); 205 when(mockedInfo.getEncodedName()).thenReturn("region1"); 206 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 207 long startTime = EnvironmentEdgeManager.currentTime(); 208 synchronized (balancer) { 209 balancer.throttle(plan); 210 } 211 long endTime = EnvironmentEdgeManager.currentTime(); 212 assertTrue((endTime - startTime) < 10); 213 } 214 215 @Test 216 public void testThrottlingRegionBelowThreshold() throws Exception { 217 Configuration conf = HBaseConfiguration.create(); 218 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 219 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 220 conf.set("hbase.rsgroup.grouploadbalancer.class", 221 CacheAwareLoadBalancer.class.getCanonicalName()); 222 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 223 loadBalancer.setMasterServices(getMockedMaster()); 224 loadBalancer.initialize(); 225 CacheAwareLoadBalancer internalBalancer = 226 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 227 internalBalancer.loadConf(conf); 228 229 ServerName server0 = servers.get(0); 230 ServerName server1 = servers.get(1); 231 Pair<ServerName, Float> regionRatio = new Pair<>(); 232 regionRatio.setFirst(server0); 233 regionRatio.setSecond(0.1f); 234 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 235 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 236 RegionInfo mockedInfo = mock(RegionInfo.class); 237 when(mockedInfo.getEncodedName()).thenReturn("region1"); 238 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 239 long startTime = EnvironmentEdgeManager.currentTime(); 240 synchronized (loadBalancer) { 241 loadBalancer.throttle(plan); 242 } 243 long endTime = EnvironmentEdgeManager.currentTime(); 244 assertTrue((endTime - startTime) >= 100); 245 } 246 247 @Test 248 public void testThrottlingCacheRatioUnknownOnTarget() throws Exception { 249 Configuration conf = HBaseConfiguration.create(); 250 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 251 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 252 conf.set("hbase.rsgroup.grouploadbalancer.class", 253 CacheAwareLoadBalancer.class.getCanonicalName()); 254 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 255 loadBalancer.setMasterServices(getMockedMaster()); 256 loadBalancer.initialize(); 257 CacheAwareLoadBalancer internalBalancer = 258 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 259 internalBalancer.loadConf(conf); 260 261 ServerName server0 = servers.get(0); 262 ServerName server1 = servers.get(1); 263 ServerName server3 = servers.get(2); 264 // setting region cache ratio 100% on server 3, though this is not the target in the region plan 265 Pair<ServerName, Float> regionRatio = new Pair<>(); 266 regionRatio.setFirst(server3); 267 regionRatio.setSecond(1.0f); 268 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 269 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 270 RegionInfo mockedInfo = mock(RegionInfo.class); 271 when(mockedInfo.getEncodedName()).thenReturn("region1"); 272 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 273 long startTime = EnvironmentEdgeManager.currentTime(); 274 synchronized (loadBalancer) { 275 loadBalancer.throttle(plan); 276 } 277 long endTime = EnvironmentEdgeManager.currentTime(); 278 assertTrue((endTime - startTime) >= 100); 279 } 280 281 @Test 282 public void testThrottlingCacheRatioUnknownForRegion() throws Exception { 283 Configuration conf = HBaseConfiguration.create(); 284 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 285 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 286 conf.set("hbase.rsgroup.grouploadbalancer.class", 287 CacheAwareLoadBalancer.class.getCanonicalName()); 288 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 289 loadBalancer.setMasterServices(getMockedMaster()); 290 loadBalancer.initialize(); 291 CacheAwareLoadBalancer internalBalancer = 292 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 293 internalBalancer.loadConf(conf); 294 295 ServerName server0 = servers.get(0); 296 ServerName server1 = servers.get(1); 297 ServerName server3 = servers.get(2); 298 // No cache ratio available for region1 299 RegionInfo mockedInfo = mock(RegionInfo.class); 300 when(mockedInfo.getEncodedName()).thenReturn("region1"); 301 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 302 long startTime = EnvironmentEdgeManager.currentTime(); 303 synchronized (loadBalancer) { 304 loadBalancer.throttle(plan); 305 } 306 long endTime = EnvironmentEdgeManager.currentTime(); 307 assertTrue((endTime - startTime) >= 100); 308 } 309 310 @Test 311 public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception { 312 // The regions are fully cached on old server 313 314 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 315 ServerName server0 = servers.get(0); 316 ServerName server1 = servers.get(1); 317 ServerName server2 = servers.get(2); 318 319 // Simulate on RS with all regions, and two RSes with no regions 320 List<RegionInfo> regionsOnServer0 = randomRegions(15); 321 List<RegionInfo> regionsOnServer1 = randomRegions(0); 322 List<RegionInfo> regionsOnServer2 = randomRegions(0); 323 324 clusterState.put(server0, regionsOnServer0); 325 clusterState.put(server1, regionsOnServer1); 326 clusterState.put(server2, regionsOnServer2); 327 328 // Mock cluster metrics 329 // Mock 5 regions from server0 were previously hosted on server1 330 List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10); 331 List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size()); 332 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 333 // mock server metrics to set cache ratio as 0 in the RS 0 334 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 335 0.0f, new ArrayList<>(), 0, 10)); 336 // mock server metrics to set cache ratio as 1 in the RS 1 337 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 338 0.0f, oldCachedRegions1, 10, 10)); 339 // mock server metrics to set cache ratio as .8 in the RS 2 340 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 341 0.0f, oldCachedRegions2, 8, 10)); 342 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 343 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 344 loadBalancer.updateClusterMetrics(clusterMetrics); 345 346 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 347 (Map) mockClusterServersWithTables(clusterState); 348 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 349 LOG.debug("plans size: {}", plans.size()); 350 LOG.debug("plans: {}", plans); 351 LOG.debug("server1 name: {}", server1.getServerName()); 352 // assert the plans are in descending order from the most cached to the least cached 353 int highCacheCount = 0; 354 for (RegionPlan plan : plans) { 355 LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(), 356 plan.getDestination().getServerName()); 357 if (highCacheCount < 5) { 358 LOG.debug("Count: {}", highCacheCount); 359 assertTrue(oldCachedRegions1.contains(plan.getRegionInfo())); 360 assertFalse(oldCachedRegions2.contains(plan.getRegionInfo())); 361 highCacheCount++; 362 } else { 363 assertTrue(oldCachedRegions2.contains(plan.getRegionInfo())); 364 assertFalse(oldCachedRegions1.contains(plan.getRegionInfo())); 365 } 366 } 367 } 368 369 @Test 370 public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { 371 // The regions are fully cached on old server 372 373 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 374 ServerName server0 = servers.get(0); 375 ServerName server1 = servers.get(1); 376 ServerName server2 = servers.get(2); 377 378 // Simulate that the regions previously hosted by server1 are now hosted on server0 379 List<RegionInfo> regionsOnServer0 = randomRegions(10); 380 List<RegionInfo> regionsOnServer1 = randomRegions(0); 381 List<RegionInfo> regionsOnServer2 = randomRegions(5); 382 383 clusterState.put(server0, regionsOnServer0); 384 clusterState.put(server1, regionsOnServer1); 385 clusterState.put(server2, regionsOnServer2); 386 387 // Mock cluster metrics 388 389 // Mock 5 regions from server0 were previously hosted on server1 390 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 391 392 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 393 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 394 0.0f, new ArrayList<>(), 0, 10)); 395 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 396 0.0f, oldCachedRegions, 10, 10)); 397 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 398 0.0f, new ArrayList<>(), 0, 10)); 399 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 400 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 401 loadBalancer.updateClusterMetrics(clusterMetrics); 402 403 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 404 (Map) mockClusterServersWithTables(clusterState); 405 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 406 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 407 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 408 for (RegionPlan plan : plans) { 409 if (plan.getSource().equals(server0)) { 410 regionsMovedFromServer0.add(plan.getRegionInfo()); 411 if (!targetServers.containsKey(plan.getDestination())) { 412 targetServers.put(plan.getDestination(), new ArrayList<>()); 413 } 414 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 415 } 416 } 417 // should move 5 regions from server0 to server1 418 assertEquals(5, regionsMovedFromServer0.size()); 419 assertEquals(5, targetServers.get(server1).size()); 420 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 421 } 422 423 @Test 424 public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { 425 // The regions are fully cached on old server 426 427 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 428 ServerName server0 = servers.get(0); 429 ServerName server1 = servers.get(1); 430 ServerName server2 = servers.get(2); 431 432 // Simulate that the regions previously hosted by server1 are now hosted on server0 433 List<RegionInfo> regionsOnServer0 = randomRegions(10); 434 List<RegionInfo> regionsOnServer1 = randomRegions(0); 435 List<RegionInfo> regionsOnServer2 = randomRegions(5); 436 437 clusterState.put(server0, regionsOnServer0); 438 clusterState.put(server1, regionsOnServer1); 439 clusterState.put(server2, regionsOnServer2); 440 441 // Mock cluster metrics 442 443 // Mock 5 regions from server0 were previously hosted on server1 444 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 445 446 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 447 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 448 1.0f, new ArrayList<>(), 0, 10)); 449 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 450 1.0f, oldCachedRegions, 10, 10)); 451 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 452 1.0f, new ArrayList<>(), 0, 10)); 453 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 454 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 455 loadBalancer.updateClusterMetrics(clusterMetrics); 456 457 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 458 (Map) mockClusterServersWithTables(clusterState); 459 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 460 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 461 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 462 for (RegionPlan plan : plans) { 463 if (plan.getSource().equals(server0)) { 464 regionsMovedFromServer0.add(plan.getRegionInfo()); 465 if (!targetServers.containsKey(plan.getDestination())) { 466 targetServers.put(plan.getDestination(), new ArrayList<>()); 467 } 468 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 469 } 470 } 471 // should move 5 regions from server0 to server1 472 assertEquals(5, regionsMovedFromServer0.size()); 473 assertEquals(5, targetServers.get(server1).size()); 474 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 475 } 476 477 @Test 478 public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { 479 // The regions are partially cached on old server 480 481 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 482 ServerName server0 = servers.get(0); 483 ServerName server1 = servers.get(1); 484 ServerName server2 = servers.get(2); 485 486 // Simulate that the regions previously hosted by server1 are now hosted on server0 487 List<RegionInfo> regionsOnServer0 = randomRegions(10); 488 List<RegionInfo> regionsOnServer1 = randomRegions(0); 489 List<RegionInfo> regionsOnServer2 = randomRegions(5); 490 491 clusterState.put(server0, regionsOnServer0); 492 clusterState.put(server1, regionsOnServer1); 493 clusterState.put(server2, regionsOnServer2); 494 495 // Mock cluster metrics 496 497 // Mock 5 regions from server0 were previously hosted on server1 498 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 499 500 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 501 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 502 0.2f, new ArrayList<>(), 0, 10)); 503 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 504 0.0f, oldCachedRegions, 6, 10)); 505 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 506 1.0f, new ArrayList<>(), 0, 10)); 507 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 508 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 509 loadBalancer.updateClusterMetrics(clusterMetrics); 510 511 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 512 (Map) mockClusterServersWithTables(clusterState); 513 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 514 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 515 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 516 for (RegionPlan plan : plans) { 517 if (plan.getSource().equals(server0)) { 518 regionsMovedFromServer0.add(plan.getRegionInfo()); 519 if (!targetServers.containsKey(plan.getDestination())) { 520 targetServers.put(plan.getDestination(), new ArrayList<>()); 521 } 522 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 523 } 524 } 525 assertEquals(5, regionsMovedFromServer0.size()); 526 assertEquals(5, targetServers.get(server1).size()); 527 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 528 } 529 530 @Timeout(60) 531 @Test 532 public void testConfigUpdateDuringBalance() throws Exception { 533 float expectedOldRatioThreshold = 0.8f; 534 float expectedNewRatioThreshold = 0.95f; 535 long throttlingTimeMillis = 10000; 536 537 conf = HBaseConfiguration.create(); 538 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis); 539 conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold); 540 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 541 conf.set("hbase.rsgroup.grouploadbalancer.class", 542 CacheAwareLoadBalancer.class.getCanonicalName()); 543 544 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 545 balancer.setMasterServices(getMockedMaster()); 546 balancer.initialize(); 547 548 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 549 ServerName server0 = servers.get(0); 550 ServerName server1 = servers.get(1); 551 ServerName server2 = servers.get(2); 552 553 // Setup cluster: all 3 regions on server0 (unbalanced) 554 List<RegionInfo> regionsOnServer0 = randomRegions(3); 555 List<RegionInfo> regionsOnServer1 = randomRegions(0); 556 List<RegionInfo> regionsOnServer2 = randomRegions(0); 557 558 clusterState.put(server0, regionsOnServer0); 559 clusterState.put(server1, regionsOnServer1); 560 clusterState.put(server2, regionsOnServer2); 561 562 // Mock metrics: NO cache info for any region = all will be throttled 563 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 564 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 565 0.0f, new ArrayList<>(), 0, 10)); 566 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 567 0.0f, new ArrayList<>(), 0, 10)); 568 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 569 0.0f, new ArrayList<>(), 0, 10)); 570 571 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 572 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 573 balancer.updateClusterMetrics(clusterMetrics); 574 575 final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable = 576 (Map) mockClusterServersWithTables(clusterState); 577 578 // Verify initial configuration is set correctly 579 CacheAwareLoadBalancer internalBalancer = 580 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 581 assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 582 583 CountDownLatch balanceStarted = new CountDownLatch(1); 584 CountDownLatch configUpdateInitiated = new CountDownLatch(1); 585 long[] configUpdateDuration = new long[1]; 586 long[] balanceDuration = new long[1]; 587 588 // Actual old ratio threshold used during balance 589 float[] actualOldRatioThresholdDuringBalance = new float[1]; 590 591 ExecutorService executor = Executors.newFixedThreadPool(2); 592 593 try { 594 // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for 595 // the duration of balance 596 Future<Long> balanceFuture = executor.submit(() -> { 597 try { 598 long start = EnvironmentEdgeManager.currentTime(); 599 synchronized (balancer) { 600 try { 601 // Simulate beginning of HMaster.balance() mark balancing window open 602 balancer.onBalancingStart(); 603 balanceStarted.countDown(); 604 605 List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable); 606 if (plans != null) { 607 for (RegionPlan plan : plans) { 608 balancer.throttle(plan); 609 } 610 } 611 // Wait until config update is initiated while balance is still in progress 612 configUpdateInitiated.await(); 613 614 // Old config should still be visible during current balance run 615 CacheAwareLoadBalancer currentInternal = 616 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 617 actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold; 618 619 } finally { 620 balancer.onBalancingComplete(); 621 } 622 } 623 return EnvironmentEdgeManager.currentTime() - start; 624 } catch (Exception e) { 625 throw new RuntimeException(e); 626 } 627 }); 628 629 // Thread 2: Simulate update_all_config / onConfigurationChange 630 Future<Long> configUpdateFuture = executor.submit(() -> { 631 try { 632 long startTime = System.currentTimeMillis(); 633 // Wait for balance to start 634 balanceStarted.await(); 635 636 // Call onConfigurationChange - should NOT hang 637 Configuration newConf = new Configuration(conf); 638 newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 639 newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000); 640 newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold); 641 balancer.onConfigurationChange(newConf); 642 configUpdateInitiated.countDown(); 643 644 return System.currentTimeMillis() - startTime; 645 } catch (Exception e) { 646 throw new RuntimeException(e); 647 } 648 }); 649 650 // Wait for both threads to complete 651 configUpdateDuration[0] = configUpdateFuture.get(); 652 balanceDuration[0] = balanceFuture.get(); 653 654 // Verify that config update didn't hang/timeout waiting for balance 655 assertTrue(configUpdateDuration[0] < balanceDuration[0]); 656 657 // Verify that ratio threshold used during balance is stll the old 658 assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f); 659 660 // Verify that config updated successfully after balance completed 661 internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 662 assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 663 664 } finally { 665 executor.shutdownNow(); 666 } 667 } 668}