001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeMap; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ClusterMetrics; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerMetrics; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.master.RegionPlan; 047import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 048import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Pair; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category(LargeTests.class) 060public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal 061 extends RSGroupableBalancerTestBase { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule 064 .forClass(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class); 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class); 068 069 private static RSGroupBasedLoadBalancer loadBalancer; 070 071 @BeforeClass 072 public static void beforeAllTests() throws Exception { 073 groups = new String[] { RSGroupInfo.DEFAULT_GROUP }; 074 servers = generateServers(3); 075 groupMap = constructGroupInfo(servers, groups); 076 tableDescs = constructTableDesc(false); 077 Configuration cong = HBaseConfiguration.create(); 078 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 079 conf.set("hbase.rsgroup.grouploadbalancer.class", 080 CacheAwareLoadBalancer.class.getCanonicalName()); 081 loadBalancer = new RSGroupBasedLoadBalancer(); 082 loadBalancer.setMasterServices(getMockedMaster()); 083 loadBalancer.initialize(); 084 } 085 086 @Test 087 public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception { 088 // The regions are not cached on old server as well as the current server. This causes 089 // skewness in the region allocation which should be fixed by the balancer 090 091 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 092 ServerName server0 = servers.get(0); 093 ServerName server1 = servers.get(1); 094 ServerName server2 = servers.get(2); 095 096 // Simulate that the regions previously hosted by server1 are now hosted on server0 097 List<RegionInfo> regionsOnServer0 = randomRegions(10); 098 List<RegionInfo> regionsOnServer1 = randomRegions(0); 099 List<RegionInfo> regionsOnServer2 = randomRegions(5); 100 101 clusterState.put(server0, regionsOnServer0); 102 clusterState.put(server1, regionsOnServer1); 103 clusterState.put(server2, regionsOnServer2); 104 105 // Mock cluster metrics 106 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 107 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 108 0.0f, new ArrayList<>(), 0, 10)); 109 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 110 0.0f, new ArrayList<>(), 0, 10)); 111 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 112 0.0f, new ArrayList<>(), 0, 10)); 113 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 114 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 115 loadBalancer.updateClusterMetrics(clusterMetrics); 116 117 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 118 (Map) mockClusterServersWithTables(clusterState); 119 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 120 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 121 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 122 for (RegionPlan plan : plans) { 123 if (plan.getSource().equals(server0)) { 124 regionsMovedFromServer0.add(plan.getRegionInfo()); 125 if (!targetServers.containsKey(plan.getDestination())) { 126 targetServers.put(plan.getDestination(), new ArrayList<>()); 127 } 128 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 129 } 130 } 131 // should move 5 regions from server0 to server 1 132 assertEquals(5, regionsMovedFromServer0.size()); 133 assertEquals(5, targetServers.get(server1).size()); 134 } 135 136 @Test 137 public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception { 138 // The regions are partially cached on old server but not cached on the current server 139 140 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 141 ServerName server0 = servers.get(0); 142 ServerName server1 = servers.get(1); 143 ServerName server2 = servers.get(2); 144 145 // Simulate that the regions previously hosted by server1 are now hosted on server0 146 List<RegionInfo> regionsOnServer0 = randomRegions(10); 147 List<RegionInfo> regionsOnServer1 = randomRegions(0); 148 List<RegionInfo> regionsOnServer2 = randomRegions(5); 149 150 clusterState.put(server0, regionsOnServer0); 151 clusterState.put(server1, regionsOnServer1); 152 clusterState.put(server2, regionsOnServer2); 153 154 // Mock cluster metrics 155 156 // Mock 5 regions from server0 were previously hosted on server1 157 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 158 159 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 160 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 161 0.0f, new ArrayList<>(), 0, 10)); 162 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 163 0.0f, oldCachedRegions, 6, 10)); 164 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 165 0.0f, new ArrayList<>(), 0, 10)); 166 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 167 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 168 loadBalancer.updateClusterMetrics(clusterMetrics); 169 170 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 171 (Map) mockClusterServersWithTables(clusterState); 172 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 173 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 174 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 175 for (RegionPlan plan : plans) { 176 if (plan.getSource().equals(server0)) { 177 regionsMovedFromServer0.add(plan.getRegionInfo()); 178 if (!targetServers.containsKey(plan.getDestination())) { 179 targetServers.put(plan.getDestination(), new ArrayList<>()); 180 } 181 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 182 } 183 } 184 // should move 5 regions from server0 to server1 185 assertEquals(5, regionsMovedFromServer0.size()); 186 assertEquals(5, targetServers.get(server1).size()); 187 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 188 } 189 190 @Test 191 public void testThrottlingRegionBeyondThreshold() throws Exception { 192 Configuration conf = HBaseConfiguration.create(); 193 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 194 conf.set("hbase.rsgroup.grouploadbalancer.class", 195 CacheAwareLoadBalancer.class.getCanonicalName()); 196 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 197 balancer.setMasterServices(getMockedMaster()); 198 balancer.initialize(); 199 200 ServerName server0 = servers.get(0); 201 ServerName server1 = servers.get(1); 202 Pair<ServerName, Float> regionRatio = new Pair<>(); 203 regionRatio.setFirst(server0); 204 regionRatio.setSecond(1.0f); 205 CacheAwareLoadBalancer internalBalancer = 206 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 207 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 208 RegionInfo mockedInfo = mock(RegionInfo.class); 209 when(mockedInfo.getEncodedName()).thenReturn("region1"); 210 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 211 long startTime = EnvironmentEdgeManager.currentTime(); 212 synchronized (balancer) { 213 balancer.throttle(plan); 214 } 215 long endTime = EnvironmentEdgeManager.currentTime(); 216 assertTrue((endTime - startTime) < 10); 217 } 218 219 @Test 220 public void testThrottlingRegionBelowThreshold() throws Exception { 221 Configuration conf = HBaseConfiguration.create(); 222 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 223 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 224 conf.set("hbase.rsgroup.grouploadbalancer.class", 225 CacheAwareLoadBalancer.class.getCanonicalName()); 226 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 227 loadBalancer.setMasterServices(getMockedMaster()); 228 loadBalancer.initialize(); 229 CacheAwareLoadBalancer internalBalancer = 230 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 231 internalBalancer.loadConf(conf); 232 233 ServerName server0 = servers.get(0); 234 ServerName server1 = servers.get(1); 235 Pair<ServerName, Float> regionRatio = new Pair<>(); 236 regionRatio.setFirst(server0); 237 regionRatio.setSecond(0.1f); 238 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 239 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 240 RegionInfo mockedInfo = mock(RegionInfo.class); 241 when(mockedInfo.getEncodedName()).thenReturn("region1"); 242 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 243 long startTime = EnvironmentEdgeManager.currentTime(); 244 synchronized (loadBalancer) { 245 loadBalancer.throttle(plan); 246 } 247 long endTime = EnvironmentEdgeManager.currentTime(); 248 assertTrue((endTime - startTime) >= 100); 249 } 250 251 @Test 252 public void testThrottlingCacheRatioUnknownOnTarget() throws Exception { 253 Configuration conf = HBaseConfiguration.create(); 254 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 255 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 256 conf.set("hbase.rsgroup.grouploadbalancer.class", 257 CacheAwareLoadBalancer.class.getCanonicalName()); 258 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 259 loadBalancer.setMasterServices(getMockedMaster()); 260 loadBalancer.initialize(); 261 CacheAwareLoadBalancer internalBalancer = 262 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 263 internalBalancer.loadConf(conf); 264 265 ServerName server0 = servers.get(0); 266 ServerName server1 = servers.get(1); 267 ServerName server3 = servers.get(2); 268 // setting region cache ratio 100% on server 3, though this is not the target in the region plan 269 Pair<ServerName, Float> regionRatio = new Pair<>(); 270 regionRatio.setFirst(server3); 271 regionRatio.setSecond(1.0f); 272 internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 273 internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio); 274 RegionInfo mockedInfo = mock(RegionInfo.class); 275 when(mockedInfo.getEncodedName()).thenReturn("region1"); 276 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 277 long startTime = EnvironmentEdgeManager.currentTime(); 278 synchronized (loadBalancer) { 279 loadBalancer.throttle(plan); 280 } 281 long endTime = EnvironmentEdgeManager.currentTime(); 282 assertTrue((endTime - startTime) >= 100); 283 } 284 285 @Test 286 public void testThrottlingCacheRatioUnknownForRegion() throws Exception { 287 Configuration conf = HBaseConfiguration.create(); 288 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 289 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100); 290 conf.set("hbase.rsgroup.grouploadbalancer.class", 291 CacheAwareLoadBalancer.class.getCanonicalName()); 292 RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer(); 293 loadBalancer.setMasterServices(getMockedMaster()); 294 loadBalancer.initialize(); 295 CacheAwareLoadBalancer internalBalancer = 296 (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer(); 297 internalBalancer.loadConf(conf); 298 299 ServerName server0 = servers.get(0); 300 ServerName server1 = servers.get(1); 301 ServerName server3 = servers.get(2); 302 // No cache ratio available for region1 303 RegionInfo mockedInfo = mock(RegionInfo.class); 304 when(mockedInfo.getEncodedName()).thenReturn("region1"); 305 RegionPlan plan = new RegionPlan(mockedInfo, server1, server0); 306 long startTime = EnvironmentEdgeManager.currentTime(); 307 synchronized (loadBalancer) { 308 loadBalancer.throttle(plan); 309 } 310 long endTime = EnvironmentEdgeManager.currentTime(); 311 assertTrue((endTime - startTime) >= 100); 312 } 313 314 @Test 315 public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception { 316 // The regions are fully cached on old server 317 318 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 319 ServerName server0 = servers.get(0); 320 ServerName server1 = servers.get(1); 321 ServerName server2 = servers.get(2); 322 323 // Simulate on RS with all regions, and two RSes with no regions 324 List<RegionInfo> regionsOnServer0 = randomRegions(15); 325 List<RegionInfo> regionsOnServer1 = randomRegions(0); 326 List<RegionInfo> regionsOnServer2 = randomRegions(0); 327 328 clusterState.put(server0, regionsOnServer0); 329 clusterState.put(server1, regionsOnServer1); 330 clusterState.put(server2, regionsOnServer2); 331 332 // Mock cluster metrics 333 // Mock 5 regions from server0 were previously hosted on server1 334 List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10); 335 List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size()); 336 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 337 // mock server metrics to set cache ratio as 0 in the RS 0 338 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 339 0.0f, new ArrayList<>(), 0, 10)); 340 // mock server metrics to set cache ratio as 1 in the RS 1 341 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 342 0.0f, oldCachedRegions1, 10, 10)); 343 // mock server metrics to set cache ratio as .8 in the RS 2 344 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 345 0.0f, oldCachedRegions2, 8, 10)); 346 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 347 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 348 loadBalancer.updateClusterMetrics(clusterMetrics); 349 350 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 351 (Map) mockClusterServersWithTables(clusterState); 352 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 353 LOG.debug("plans size: {}", plans.size()); 354 LOG.debug("plans: {}", plans); 355 LOG.debug("server1 name: {}", server1.getServerName()); 356 // assert the plans are in descending order from the most cached to the least cached 357 int highCacheCount = 0; 358 for (RegionPlan plan : plans) { 359 LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(), 360 plan.getDestination().getServerName()); 361 if (highCacheCount < 5) { 362 LOG.debug("Count: {}", highCacheCount); 363 assertTrue(oldCachedRegions1.contains(plan.getRegionInfo())); 364 assertFalse(oldCachedRegions2.contains(plan.getRegionInfo())); 365 highCacheCount++; 366 } else { 367 assertTrue(oldCachedRegions2.contains(plan.getRegionInfo())); 368 assertFalse(oldCachedRegions1.contains(plan.getRegionInfo())); 369 } 370 } 371 } 372 373 @Test 374 public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception { 375 // The regions are fully cached on old server 376 377 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 378 ServerName server0 = servers.get(0); 379 ServerName server1 = servers.get(1); 380 ServerName server2 = servers.get(2); 381 382 // Simulate that the regions previously hosted by server1 are now hosted on server0 383 List<RegionInfo> regionsOnServer0 = randomRegions(10); 384 List<RegionInfo> regionsOnServer1 = randomRegions(0); 385 List<RegionInfo> regionsOnServer2 = randomRegions(5); 386 387 clusterState.put(server0, regionsOnServer0); 388 clusterState.put(server1, regionsOnServer1); 389 clusterState.put(server2, regionsOnServer2); 390 391 // Mock cluster metrics 392 393 // Mock 5 regions from server0 were previously hosted on server1 394 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 395 396 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 397 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 398 0.0f, new ArrayList<>(), 0, 10)); 399 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 400 0.0f, oldCachedRegions, 10, 10)); 401 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 402 0.0f, new ArrayList<>(), 0, 10)); 403 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 404 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 405 loadBalancer.updateClusterMetrics(clusterMetrics); 406 407 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 408 (Map) mockClusterServersWithTables(clusterState); 409 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 410 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 411 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 412 for (RegionPlan plan : plans) { 413 if (plan.getSource().equals(server0)) { 414 regionsMovedFromServer0.add(plan.getRegionInfo()); 415 if (!targetServers.containsKey(plan.getDestination())) { 416 targetServers.put(plan.getDestination(), new ArrayList<>()); 417 } 418 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 419 } 420 } 421 // should move 5 regions from server0 to server1 422 assertEquals(5, regionsMovedFromServer0.size()); 423 assertEquals(5, targetServers.get(server1).size()); 424 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 425 } 426 427 @Test 428 public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception { 429 // The regions are fully cached on old server 430 431 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 432 ServerName server0 = servers.get(0); 433 ServerName server1 = servers.get(1); 434 ServerName server2 = servers.get(2); 435 436 // Simulate that the regions previously hosted by server1 are now hosted on server0 437 List<RegionInfo> regionsOnServer0 = randomRegions(10); 438 List<RegionInfo> regionsOnServer1 = randomRegions(0); 439 List<RegionInfo> regionsOnServer2 = randomRegions(5); 440 441 clusterState.put(server0, regionsOnServer0); 442 clusterState.put(server1, regionsOnServer1); 443 clusterState.put(server2, regionsOnServer2); 444 445 // Mock cluster metrics 446 447 // Mock 5 regions from server0 were previously hosted on server1 448 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 449 450 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 451 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 452 1.0f, new ArrayList<>(), 0, 10)); 453 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 454 1.0f, oldCachedRegions, 10, 10)); 455 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 456 1.0f, new ArrayList<>(), 0, 10)); 457 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 458 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 459 loadBalancer.updateClusterMetrics(clusterMetrics); 460 461 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 462 (Map) mockClusterServersWithTables(clusterState); 463 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 464 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 465 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 466 for (RegionPlan plan : plans) { 467 if (plan.getSource().equals(server0)) { 468 regionsMovedFromServer0.add(plan.getRegionInfo()); 469 if (!targetServers.containsKey(plan.getDestination())) { 470 targetServers.put(plan.getDestination(), new ArrayList<>()); 471 } 472 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 473 } 474 } 475 // should move 5 regions from server0 to server1 476 assertEquals(5, regionsMovedFromServer0.size()); 477 assertEquals(5, targetServers.get(server1).size()); 478 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 479 } 480 481 @Test 482 public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception { 483 // The regions are partially cached on old server 484 485 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 486 ServerName server0 = servers.get(0); 487 ServerName server1 = servers.get(1); 488 ServerName server2 = servers.get(2); 489 490 // Simulate that the regions previously hosted by server1 are now hosted on server0 491 List<RegionInfo> regionsOnServer0 = randomRegions(10); 492 List<RegionInfo> regionsOnServer1 = randomRegions(0); 493 List<RegionInfo> regionsOnServer2 = randomRegions(5); 494 495 clusterState.put(server0, regionsOnServer0); 496 clusterState.put(server1, regionsOnServer1); 497 clusterState.put(server2, regionsOnServer2); 498 499 // Mock cluster metrics 500 501 // Mock 5 regions from server0 were previously hosted on server1 502 List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1); 503 504 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 505 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 506 0.2f, new ArrayList<>(), 0, 10)); 507 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 508 0.0f, oldCachedRegions, 6, 10)); 509 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 510 1.0f, new ArrayList<>(), 0, 10)); 511 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 512 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 513 loadBalancer.updateClusterMetrics(clusterMetrics); 514 515 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = 516 (Map) mockClusterServersWithTables(clusterState); 517 List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); 518 Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>(); 519 Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>(); 520 for (RegionPlan plan : plans) { 521 if (plan.getSource().equals(server0)) { 522 regionsMovedFromServer0.add(plan.getRegionInfo()); 523 if (!targetServers.containsKey(plan.getDestination())) { 524 targetServers.put(plan.getDestination(), new ArrayList<>()); 525 } 526 targetServers.get(plan.getDestination()).add(plan.getRegionInfo()); 527 } 528 } 529 assertEquals(5, regionsMovedFromServer0.size()); 530 assertEquals(5, targetServers.get(server1).size()); 531 assertTrue(targetServers.get(server1).containsAll(oldCachedRegions)); 532 } 533 534 @Test(timeout = 60000) 535 public void testConfigUpdateDuringBalance() throws Exception { 536 float expectedOldRatioThreshold = 0.8f; 537 float expectedNewRatioThreshold = 0.95f; 538 long throttlingTimeMillis = 10000; 539 540 conf = HBaseConfiguration.create(); 541 conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis); 542 conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold); 543 conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 544 conf.set("hbase.rsgroup.grouploadbalancer.class", 545 CacheAwareLoadBalancer.class.getCanonicalName()); 546 547 RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer(); 548 balancer.setMasterServices(getMockedMaster()); 549 balancer.initialize(); 550 551 Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>(); 552 ServerName server0 = servers.get(0); 553 ServerName server1 = servers.get(1); 554 ServerName server2 = servers.get(2); 555 556 // Setup cluster: all 3 regions on server0 (unbalanced) 557 List<RegionInfo> regionsOnServer0 = randomRegions(3); 558 List<RegionInfo> regionsOnServer1 = randomRegions(0); 559 List<RegionInfo> regionsOnServer2 = randomRegions(0); 560 561 clusterState.put(server0, regionsOnServer0); 562 clusterState.put(server1, regionsOnServer1); 563 clusterState.put(server2, regionsOnServer2); 564 565 // Mock metrics: NO cache info for any region = all will be throttled 566 Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>(); 567 serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 568 0.0f, new ArrayList<>(), 0, 10)); 569 serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 570 0.0f, new ArrayList<>(), 0, 10)); 571 serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 572 0.0f, new ArrayList<>(), 0, 10)); 573 574 ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); 575 when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); 576 balancer.updateClusterMetrics(clusterMetrics); 577 578 final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable = 579 (Map) mockClusterServersWithTables(clusterState); 580 581 // Verify initial configuration is set correctly 582 CacheAwareLoadBalancer internalBalancer = 583 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 584 assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 585 586 CountDownLatch balanceStarted = new CountDownLatch(1); 587 CountDownLatch configUpdateInitiated = new CountDownLatch(1); 588 long[] configUpdateDuration = new long[1]; 589 long[] balanceDuration = new long[1]; 590 591 // Actual old ratio threshold used during balance 592 float[] actualOldRatioThresholdDuringBalance = new float[1]; 593 594 ExecutorService executor = Executors.newFixedThreadPool(2); 595 596 try { 597 // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for 598 // the duration of balance 599 Future<Long> balanceFuture = executor.submit(() -> { 600 try { 601 long start = EnvironmentEdgeManager.currentTime(); 602 synchronized (balancer) { 603 try { 604 // Simulate beginning of HMaster.balance() mark balancing window open 605 balancer.onBalancingStart(); 606 balanceStarted.countDown(); 607 608 List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable); 609 if (plans != null) { 610 for (RegionPlan plan : plans) { 611 balancer.throttle(plan); 612 } 613 } 614 // Wait until config update is initiated while balance is still in progress 615 configUpdateInitiated.await(); 616 617 // Old config should still be visible during current balance run 618 CacheAwareLoadBalancer currentInternal = 619 (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 620 actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold; 621 622 } finally { 623 balancer.onBalancingComplete(); 624 } 625 } 626 return EnvironmentEdgeManager.currentTime() - start; 627 } catch (Exception e) { 628 throw new RuntimeException(e); 629 } 630 }); 631 632 // Thread 2: Simulate update_all_config / onConfigurationChange 633 Future<Long> configUpdateFuture = executor.submit(() -> { 634 try { 635 long startTime = System.currentTimeMillis(); 636 // Wait for balance to start 637 balanceStarted.await(); 638 639 // Call onConfigurationChange - should NOT hang 640 Configuration newConf = new Configuration(conf); 641 newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); 642 newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000); 643 newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold); 644 balancer.onConfigurationChange(newConf); 645 configUpdateInitiated.countDown(); 646 647 return System.currentTimeMillis() - startTime; 648 } catch (Exception e) { 649 throw new RuntimeException(e); 650 } 651 }); 652 653 // Wait for both threads to complete 654 configUpdateDuration[0] = configUpdateFuture.get(); 655 balanceDuration[0] = balanceFuture.get(); 656 657 // Verify that config update didn't hang/timeout waiting for balance 658 assertTrue(configUpdateDuration[0] < balanceDuration[0]); 659 660 // Verify that ratio threshold used during balance is stll the old 661 assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f); 662 663 // Verify that config updated successfully after balance completed 664 internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer(); 665 assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f); 666 667 } finally { 668 executor.shutdownNow(); 669 } 670 } 671}