001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterMetrics;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ServerMetrics;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.master.RegionPlan;
046import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
047import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.Pair;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054import org.junit.jupiter.api.Timeout;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058@Tag(LargeTests.TAG)
059public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal
060  extends RSGroupableBalancerTestBase {
061
062  private static final Logger LOG =
063    LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
064
065  private static RSGroupBasedLoadBalancer loadBalancer;
066
067  @BeforeAll
068  public static void beforeAllTests() throws Exception {
069    groups = new String[] { RSGroupInfo.DEFAULT_GROUP };
070    servers = generateServers(3);
071    groupMap = constructGroupInfo(servers, groups);
072    tableDescs = constructTableDesc(false);
073    Configuration cong = HBaseConfiguration.create();
074    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
075    conf.set("hbase.rsgroup.grouploadbalancer.class",
076      CacheAwareLoadBalancer.class.getCanonicalName());
077    loadBalancer = new RSGroupBasedLoadBalancer();
078    loadBalancer.setMasterServices(getMockedMaster());
079    loadBalancer.initialize();
080  }
081
082  @Test
083  public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception {
084    // The regions are not cached on old server as well as the current server. This causes
085    // skewness in the region allocation which should be fixed by the balancer
086
087    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
088    ServerName server0 = servers.get(0);
089    ServerName server1 = servers.get(1);
090    ServerName server2 = servers.get(2);
091
092    // Simulate that the regions previously hosted by server1 are now hosted on server0
093    List<RegionInfo> regionsOnServer0 = randomRegions(10);
094    List<RegionInfo> regionsOnServer1 = randomRegions(0);
095    List<RegionInfo> regionsOnServer2 = randomRegions(5);
096
097    clusterState.put(server0, regionsOnServer0);
098    clusterState.put(server1, regionsOnServer1);
099    clusterState.put(server2, regionsOnServer2);
100
101    // Mock cluster metrics
102    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
103    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
104      0.0f, new ArrayList<>(), 0, 10));
105    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
106      0.0f, new ArrayList<>(), 0, 10));
107    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
108      0.0f, new ArrayList<>(), 0, 10));
109    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
110    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
111    loadBalancer.updateClusterMetrics(clusterMetrics);
112
113    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
114      (Map) mockClusterServersWithTables(clusterState);
115    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
116    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
117    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
118    for (RegionPlan plan : plans) {
119      if (plan.getSource().equals(server0)) {
120        regionsMovedFromServer0.add(plan.getRegionInfo());
121        if (!targetServers.containsKey(plan.getDestination())) {
122          targetServers.put(plan.getDestination(), new ArrayList<>());
123        }
124        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
125      }
126    }
127    // should move 5 regions from server0 to server 1
128    assertEquals(5, regionsMovedFromServer0.size());
129    assertEquals(5, targetServers.get(server1).size());
130  }
131
132  @Test
133  public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception {
134    // The regions are partially cached on old server but not cached on the current server
135
136    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
137    ServerName server0 = servers.get(0);
138    ServerName server1 = servers.get(1);
139    ServerName server2 = servers.get(2);
140
141    // Simulate that the regions previously hosted by server1 are now hosted on server0
142    List<RegionInfo> regionsOnServer0 = randomRegions(10);
143    List<RegionInfo> regionsOnServer1 = randomRegions(0);
144    List<RegionInfo> regionsOnServer2 = randomRegions(5);
145
146    clusterState.put(server0, regionsOnServer0);
147    clusterState.put(server1, regionsOnServer1);
148    clusterState.put(server2, regionsOnServer2);
149
150    // Mock cluster metrics
151
152    // Mock 5 regions from server0 were previously hosted on server1
153    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
154
155    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
156    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
157      0.0f, new ArrayList<>(), 0, 10));
158    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
159      0.0f, oldCachedRegions, 6, 10));
160    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
161      0.0f, new ArrayList<>(), 0, 10));
162    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
163    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
164    loadBalancer.updateClusterMetrics(clusterMetrics);
165
166    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
167      (Map) mockClusterServersWithTables(clusterState);
168    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
169    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
170    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
171    for (RegionPlan plan : plans) {
172      if (plan.getSource().equals(server0)) {
173        regionsMovedFromServer0.add(plan.getRegionInfo());
174        if (!targetServers.containsKey(plan.getDestination())) {
175          targetServers.put(plan.getDestination(), new ArrayList<>());
176        }
177        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
178      }
179    }
180    // should move 5 regions from server0 to server1
181    assertEquals(5, regionsMovedFromServer0.size());
182    assertEquals(5, targetServers.get(server1).size());
183    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
184  }
185
186  @Test
187  public void testThrottlingRegionBeyondThreshold() throws Exception {
188    Configuration conf = HBaseConfiguration.create();
189    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
190    conf.set("hbase.rsgroup.grouploadbalancer.class",
191      CacheAwareLoadBalancer.class.getCanonicalName());
192    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
193    balancer.setMasterServices(getMockedMaster());
194    balancer.initialize();
195
196    ServerName server0 = servers.get(0);
197    ServerName server1 = servers.get(1);
198    Pair<ServerName, Float> regionRatio = new Pair<>();
199    regionRatio.setFirst(server0);
200    regionRatio.setSecond(1.0f);
201    CacheAwareLoadBalancer internalBalancer =
202      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
203    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
204    RegionInfo mockedInfo = mock(RegionInfo.class);
205    when(mockedInfo.getEncodedName()).thenReturn("region1");
206    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
207    long startTime = EnvironmentEdgeManager.currentTime();
208    synchronized (balancer) {
209      balancer.throttle(plan);
210    }
211    long endTime = EnvironmentEdgeManager.currentTime();
212    assertTrue((endTime - startTime) < 10);
213  }
214
215  @Test
216  public void testThrottlingRegionBelowThreshold() throws Exception {
217    Configuration conf = HBaseConfiguration.create();
218    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
219    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
220    conf.set("hbase.rsgroup.grouploadbalancer.class",
221      CacheAwareLoadBalancer.class.getCanonicalName());
222    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
223    loadBalancer.setMasterServices(getMockedMaster());
224    loadBalancer.initialize();
225    CacheAwareLoadBalancer internalBalancer =
226      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
227    internalBalancer.loadConf(conf);
228
229    ServerName server0 = servers.get(0);
230    ServerName server1 = servers.get(1);
231    Pair<ServerName, Float> regionRatio = new Pair<>();
232    regionRatio.setFirst(server0);
233    regionRatio.setSecond(0.1f);
234    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
235    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
236    RegionInfo mockedInfo = mock(RegionInfo.class);
237    when(mockedInfo.getEncodedName()).thenReturn("region1");
238    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
239    long startTime = EnvironmentEdgeManager.currentTime();
240    synchronized (loadBalancer) {
241      loadBalancer.throttle(plan);
242    }
243    long endTime = EnvironmentEdgeManager.currentTime();
244    assertTrue((endTime - startTime) >= 100);
245  }
246
247  @Test
248  public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
249    Configuration conf = HBaseConfiguration.create();
250    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
251    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
252    conf.set("hbase.rsgroup.grouploadbalancer.class",
253      CacheAwareLoadBalancer.class.getCanonicalName());
254    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
255    loadBalancer.setMasterServices(getMockedMaster());
256    loadBalancer.initialize();
257    CacheAwareLoadBalancer internalBalancer =
258      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
259    internalBalancer.loadConf(conf);
260
261    ServerName server0 = servers.get(0);
262    ServerName server1 = servers.get(1);
263    ServerName server3 = servers.get(2);
264    // setting region cache ratio 100% on server 3, though this is not the target in the region plan
265    Pair<ServerName, Float> regionRatio = new Pair<>();
266    regionRatio.setFirst(server3);
267    regionRatio.setSecond(1.0f);
268    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
269    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
270    RegionInfo mockedInfo = mock(RegionInfo.class);
271    when(mockedInfo.getEncodedName()).thenReturn("region1");
272    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
273    long startTime = EnvironmentEdgeManager.currentTime();
274    synchronized (loadBalancer) {
275      loadBalancer.throttle(plan);
276    }
277    long endTime = EnvironmentEdgeManager.currentTime();
278    assertTrue((endTime - startTime) >= 100);
279  }
280
281  @Test
282  public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
283    Configuration conf = HBaseConfiguration.create();
284    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
285    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
286    conf.set("hbase.rsgroup.grouploadbalancer.class",
287      CacheAwareLoadBalancer.class.getCanonicalName());
288    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
289    loadBalancer.setMasterServices(getMockedMaster());
290    loadBalancer.initialize();
291    CacheAwareLoadBalancer internalBalancer =
292      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
293    internalBalancer.loadConf(conf);
294
295    ServerName server0 = servers.get(0);
296    ServerName server1 = servers.get(1);
297    ServerName server3 = servers.get(2);
298    // No cache ratio available for region1
299    RegionInfo mockedInfo = mock(RegionInfo.class);
300    when(mockedInfo.getEncodedName()).thenReturn("region1");
301    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
302    long startTime = EnvironmentEdgeManager.currentTime();
303    synchronized (loadBalancer) {
304      loadBalancer.throttle(plan);
305    }
306    long endTime = EnvironmentEdgeManager.currentTime();
307    assertTrue((endTime - startTime) >= 100);
308  }
309
310  @Test
311  public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
312    // The regions are fully cached on old server
313
314    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
315    ServerName server0 = servers.get(0);
316    ServerName server1 = servers.get(1);
317    ServerName server2 = servers.get(2);
318
319    // Simulate on RS with all regions, and two RSes with no regions
320    List<RegionInfo> regionsOnServer0 = randomRegions(15);
321    List<RegionInfo> regionsOnServer1 = randomRegions(0);
322    List<RegionInfo> regionsOnServer2 = randomRegions(0);
323
324    clusterState.put(server0, regionsOnServer0);
325    clusterState.put(server1, regionsOnServer1);
326    clusterState.put(server2, regionsOnServer2);
327
328    // Mock cluster metrics
329    // Mock 5 regions from server0 were previously hosted on server1
330    List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
331    List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size());
332    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
333    // mock server metrics to set cache ratio as 0 in the RS 0
334    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
335      0.0f, new ArrayList<>(), 0, 10));
336    // mock server metrics to set cache ratio as 1 in the RS 1
337    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
338      0.0f, oldCachedRegions1, 10, 10));
339    // mock server metrics to set cache ratio as .8 in the RS 2
340    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
341      0.0f, oldCachedRegions2, 8, 10));
342    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
343    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
344    loadBalancer.updateClusterMetrics(clusterMetrics);
345
346    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
347      (Map) mockClusterServersWithTables(clusterState);
348    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
349    LOG.debug("plans size: {}", plans.size());
350    LOG.debug("plans: {}", plans);
351    LOG.debug("server1 name: {}", server1.getServerName());
352    // assert the plans are in descending order from the most cached to the least cached
353    int highCacheCount = 0;
354    for (RegionPlan plan : plans) {
355      LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(),
356        plan.getDestination().getServerName());
357      if (highCacheCount < 5) {
358        LOG.debug("Count: {}", highCacheCount);
359        assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
360        assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
361        highCacheCount++;
362      } else {
363        assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
364        assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
365      }
366    }
367  }
368
369  @Test
370  public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception {
371    // The regions are fully cached on old server
372
373    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
374    ServerName server0 = servers.get(0);
375    ServerName server1 = servers.get(1);
376    ServerName server2 = servers.get(2);
377
378    // Simulate that the regions previously hosted by server1 are now hosted on server0
379    List<RegionInfo> regionsOnServer0 = randomRegions(10);
380    List<RegionInfo> regionsOnServer1 = randomRegions(0);
381    List<RegionInfo> regionsOnServer2 = randomRegions(5);
382
383    clusterState.put(server0, regionsOnServer0);
384    clusterState.put(server1, regionsOnServer1);
385    clusterState.put(server2, regionsOnServer2);
386
387    // Mock cluster metrics
388
389    // Mock 5 regions from server0 were previously hosted on server1
390    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
391
392    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
393    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
394      0.0f, new ArrayList<>(), 0, 10));
395    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
396      0.0f, oldCachedRegions, 10, 10));
397    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
398      0.0f, new ArrayList<>(), 0, 10));
399    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
400    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
401    loadBalancer.updateClusterMetrics(clusterMetrics);
402
403    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
404      (Map) mockClusterServersWithTables(clusterState);
405    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
406    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
407    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
408    for (RegionPlan plan : plans) {
409      if (plan.getSource().equals(server0)) {
410        regionsMovedFromServer0.add(plan.getRegionInfo());
411        if (!targetServers.containsKey(plan.getDestination())) {
412          targetServers.put(plan.getDestination(), new ArrayList<>());
413        }
414        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
415      }
416    }
417    // should move 5 regions from server0 to server1
418    assertEquals(5, regionsMovedFromServer0.size());
419    assertEquals(5, targetServers.get(server1).size());
420    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
421  }
422
423  @Test
424  public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
425    // The regions are fully cached on old server
426
427    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
428    ServerName server0 = servers.get(0);
429    ServerName server1 = servers.get(1);
430    ServerName server2 = servers.get(2);
431
432    // Simulate that the regions previously hosted by server1 are now hosted on server0
433    List<RegionInfo> regionsOnServer0 = randomRegions(10);
434    List<RegionInfo> regionsOnServer1 = randomRegions(0);
435    List<RegionInfo> regionsOnServer2 = randomRegions(5);
436
437    clusterState.put(server0, regionsOnServer0);
438    clusterState.put(server1, regionsOnServer1);
439    clusterState.put(server2, regionsOnServer2);
440
441    // Mock cluster metrics
442
443    // Mock 5 regions from server0 were previously hosted on server1
444    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
445
446    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
447    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
448      1.0f, new ArrayList<>(), 0, 10));
449    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
450      1.0f, oldCachedRegions, 10, 10));
451    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
452      1.0f, new ArrayList<>(), 0, 10));
453    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
454    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
455    loadBalancer.updateClusterMetrics(clusterMetrics);
456
457    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
458      (Map) mockClusterServersWithTables(clusterState);
459    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
460    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
461    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
462    for (RegionPlan plan : plans) {
463      if (plan.getSource().equals(server0)) {
464        regionsMovedFromServer0.add(plan.getRegionInfo());
465        if (!targetServers.containsKey(plan.getDestination())) {
466          targetServers.put(plan.getDestination(), new ArrayList<>());
467        }
468        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
469      }
470    }
471    // should move 5 regions from server0 to server1
472    assertEquals(5, regionsMovedFromServer0.size());
473    assertEquals(5, targetServers.get(server1).size());
474    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
475  }
476
477  @Test
478  public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception {
479    // The regions are partially cached on old server
480
481    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
482    ServerName server0 = servers.get(0);
483    ServerName server1 = servers.get(1);
484    ServerName server2 = servers.get(2);
485
486    // Simulate that the regions previously hosted by server1 are now hosted on server0
487    List<RegionInfo> regionsOnServer0 = randomRegions(10);
488    List<RegionInfo> regionsOnServer1 = randomRegions(0);
489    List<RegionInfo> regionsOnServer2 = randomRegions(5);
490
491    clusterState.put(server0, regionsOnServer0);
492    clusterState.put(server1, regionsOnServer1);
493    clusterState.put(server2, regionsOnServer2);
494
495    // Mock cluster metrics
496
497    // Mock 5 regions from server0 were previously hosted on server1
498    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
499
500    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
501    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
502      0.2f, new ArrayList<>(), 0, 10));
503    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
504      0.0f, oldCachedRegions, 6, 10));
505    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
506      1.0f, new ArrayList<>(), 0, 10));
507    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
508    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
509    loadBalancer.updateClusterMetrics(clusterMetrics);
510
511    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
512      (Map) mockClusterServersWithTables(clusterState);
513    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
514    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
515    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
516    for (RegionPlan plan : plans) {
517      if (plan.getSource().equals(server0)) {
518        regionsMovedFromServer0.add(plan.getRegionInfo());
519        if (!targetServers.containsKey(plan.getDestination())) {
520          targetServers.put(plan.getDestination(), new ArrayList<>());
521        }
522        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
523      }
524    }
525    assertEquals(5, regionsMovedFromServer0.size());
526    assertEquals(5, targetServers.get(server1).size());
527    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
528  }
529
530  @Timeout(60)
531  @Test
532  public void testConfigUpdateDuringBalance() throws Exception {
533    float expectedOldRatioThreshold = 0.8f;
534    float expectedNewRatioThreshold = 0.95f;
535    long throttlingTimeMillis = 10000;
536
537    conf = HBaseConfiguration.create();
538    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis);
539    conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold);
540    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
541    conf.set("hbase.rsgroup.grouploadbalancer.class",
542      CacheAwareLoadBalancer.class.getCanonicalName());
543
544    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
545    balancer.setMasterServices(getMockedMaster());
546    balancer.initialize();
547
548    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
549    ServerName server0 = servers.get(0);
550    ServerName server1 = servers.get(1);
551    ServerName server2 = servers.get(2);
552
553    // Setup cluster: all 3 regions on server0 (unbalanced)
554    List<RegionInfo> regionsOnServer0 = randomRegions(3);
555    List<RegionInfo> regionsOnServer1 = randomRegions(0);
556    List<RegionInfo> regionsOnServer2 = randomRegions(0);
557
558    clusterState.put(server0, regionsOnServer0);
559    clusterState.put(server1, regionsOnServer1);
560    clusterState.put(server2, regionsOnServer2);
561
562    // Mock metrics: NO cache info for any region = all will be throttled
563    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
564    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
565      0.0f, new ArrayList<>(), 0, 10));
566    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
567      0.0f, new ArrayList<>(), 0, 10));
568    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
569      0.0f, new ArrayList<>(), 0, 10));
570
571    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
572    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
573    balancer.updateClusterMetrics(clusterMetrics);
574
575    final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
576      (Map) mockClusterServersWithTables(clusterState);
577
578    // Verify initial configuration is set correctly
579    CacheAwareLoadBalancer internalBalancer =
580      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
581    assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
582
583    CountDownLatch balanceStarted = new CountDownLatch(1);
584    CountDownLatch configUpdateInitiated = new CountDownLatch(1);
585    long[] configUpdateDuration = new long[1];
586    long[] balanceDuration = new long[1];
587
588    // Actual old ratio threshold used during balance
589    float[] actualOldRatioThresholdDuringBalance = new float[1];
590
591    ExecutorService executor = Executors.newFixedThreadPool(2);
592
593    try {
594      // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for
595      // the duration of balance
596      Future<Long> balanceFuture = executor.submit(() -> {
597        try {
598          long start = EnvironmentEdgeManager.currentTime();
599          synchronized (balancer) {
600            try {
601              // Simulate beginning of HMaster.balance() mark balancing window open
602              balancer.onBalancingStart();
603              balanceStarted.countDown();
604
605              List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
606              if (plans != null) {
607                for (RegionPlan plan : plans) {
608                  balancer.throttle(plan);
609                }
610              }
611              // Wait until config update is initiated while balance is still in progress
612              configUpdateInitiated.await();
613
614              // Old config should still be visible during current balance run
615              CacheAwareLoadBalancer currentInternal =
616                (CacheAwareLoadBalancer) balancer.getInternalBalancer();
617              actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold;
618
619            } finally {
620              balancer.onBalancingComplete();
621            }
622          }
623          return EnvironmentEdgeManager.currentTime() - start;
624        } catch (Exception e) {
625          throw new RuntimeException(e);
626        }
627      });
628
629      // Thread 2: Simulate update_all_config / onConfigurationChange
630      Future<Long> configUpdateFuture = executor.submit(() -> {
631        try {
632          long startTime = System.currentTimeMillis();
633          // Wait for balance to start
634          balanceStarted.await();
635
636          // Call onConfigurationChange - should NOT hang
637          Configuration newConf = new Configuration(conf);
638          newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
639          newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
640          newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold);
641          balancer.onConfigurationChange(newConf);
642          configUpdateInitiated.countDown();
643
644          return System.currentTimeMillis() - startTime;
645        } catch (Exception e) {
646          throw new RuntimeException(e);
647        }
648      });
649
650      // Wait for both threads to complete
651      configUpdateDuration[0] = configUpdateFuture.get();
652      balanceDuration[0] = balanceFuture.get();
653
654      // Verify that config update didn't hang/timeout waiting for balance
655      assertTrue(configUpdateDuration[0] < balanceDuration[0]);
656
657      // Verify that ratio threshold used during balance is stll the old
658      assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f);
659
660      // Verify that config updated successfully after balance completed
661      internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer();
662      assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
663
664    } finally {
665      executor.shutdownNow();
666    }
667  }
668}