001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.Future;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ClusterMetrics;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
048import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.Pair;
052import org.junit.jupiter.api.BeforeAll;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.junit.jupiter.api.Timeout;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Tag(LargeTests.TAG)
060public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal
061  extends RSGroupableBalancerTestBase {
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
065
066  private static RSGroupBasedLoadBalancer loadBalancer;
067
068  @BeforeAll
069  public static void beforeAllTests() throws Exception {
070    groups = new String[] { RSGroupInfo.DEFAULT_GROUP };
071    servers = generateServers(3);
072    groupMap = constructGroupInfo(servers, groups);
073    tableDescs = constructTableDesc(false);
074    Configuration cong = HBaseConfiguration.create();
075    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
076    conf.set("hbase.rsgroup.grouploadbalancer.class",
077      CacheAwareLoadBalancer.class.getCanonicalName());
078    loadBalancer = new RSGroupBasedLoadBalancer();
079    loadBalancer.setMasterServices(getMockedMaster());
080    loadBalancer.initialize();
081  }
082
083  @Test
084  public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception {
085    // The regions are not cached on old server as well as the current server. This causes
086    // skewness in the region allocation which should be fixed by the balancer
087
088    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
089    ServerName server0 = servers.get(0);
090    ServerName server1 = servers.get(1);
091    ServerName server2 = servers.get(2);
092
093    // Simulate that the regions previously hosted by server1 are now hosted on server0
094    List<RegionInfo> regionsOnServer0 = randomRegions(10);
095    List<RegionInfo> regionsOnServer1 = randomRegions(0);
096    List<RegionInfo> regionsOnServer2 = randomRegions(5);
097
098    clusterState.put(server0, regionsOnServer0);
099    clusterState.put(server1, regionsOnServer1);
100    clusterState.put(server2, regionsOnServer2);
101
102    // Mock cluster metrics
103    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
104    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
105      0.0f, new ArrayList<>(), 0, 10));
106    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
107      0.0f, new ArrayList<>(), 0, 10));
108    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
109      0.0f, new ArrayList<>(), 0, 10));
110    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
111    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
112    loadBalancer.updateClusterMetrics(clusterMetrics);
113
114    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
115      (Map) mockClusterServersWithTables(clusterState);
116    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
117    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
118    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
119    for (RegionPlan plan : plans) {
120      if (plan.getSource().equals(server0)) {
121        regionsMovedFromServer0.add(plan.getRegionInfo());
122        if (!targetServers.containsKey(plan.getDestination())) {
123          targetServers.put(plan.getDestination(), new ArrayList<>());
124        }
125        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
126      }
127    }
128    // should move 5 regions from server0 to server 1
129    assertEquals(5, regionsMovedFromServer0.size());
130    assertEquals(5, targetServers.get(server1).size());
131  }
132
133  /**
134   * Regions on the overloaded RS report low block-cache ratio; no RS reports prefetch/historical
135   * cache for those regions (so {@link CacheAwareLoadBalancer.CacheAwareCandidateGenerator} has no
136   * "old server" to prefer). Another RS has ample free block cache. The balancer should still emit
137   * plans that shed load from the hot RS onto the idle RS with spare cache capacity.
138   */
139  @Test
140  public void testLowCacheRatioNoHistoricalCacheRelocatesWhenTargetHasFreeBlockCache()
141    throws Exception {
142    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
143    ServerName server0 = servers.get(0);
144    ServerName server1 = servers.get(1);
145    ServerName server2 = servers.get(2);
146
147    List<RegionInfo> regionsOnServer0 = randomRegions(10);
148    List<RegionInfo> regionsOnServer1 = randomRegions(0);
149    List<RegionInfo> regionsOnServer2 = randomRegions(5);
150
151    clusterState.put(server0, regionsOnServer0);
152    clusterState.put(server1, regionsOnServer1);
153    clusterState.put(server2, regionsOnServer2);
154
155    // Below LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT (0.35);
156    ServerMetrics sm0 = mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, 0.1f,
157      new ArrayList<>(), 0, 10);
158    when(sm0.getCacheFreeSize()).thenReturn(0L);
159    ServerMetrics sm1 = mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, 0.0f,
160      new ArrayList<>(), 0, 10);
161    // Simulates 1GB free cache space on server1
162    when(sm1.getCacheFreeSize()).thenReturn(1024L * 1024 * 1024);
163    ServerMetrics sm2 = mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, 1.0f,
164      new ArrayList<>(), 0, 10);
165    when(sm2.getCacheFreeSize()).thenReturn(0L);
166
167    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
168    serverMetricsMap.put(server0, sm0);
169    serverMetricsMap.put(server1, sm1);
170    serverMetricsMap.put(server2, sm2);
171    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
172    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
173    loadBalancer.updateClusterMetrics(clusterMetrics);
174
175    CacheAwareLoadBalancer internalBalancer =
176      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
177    assertNotNull(internalBalancer);
178    assertTrue(internalBalancer.regionCacheRatioOnOldServerMap.isEmpty());
179
180    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
181      (Map) mockClusterServersWithTables(clusterState);
182    List<RegionPlan> plans = loadBalancer.balanceCluster(loadOfAllTable);
183    assertNotNull(plans);
184
185    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
186    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
187    for (RegionPlan plan : plans) {
188      if (plan.getSource().equals(server0)) {
189        regionsMovedFromServer0.add(plan.getRegionInfo());
190        if (!targetServers.containsKey(plan.getDestination())) {
191          targetServers.put(plan.getDestination(), new ArrayList<>());
192        }
193        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
194      }
195    }
196    assertEquals(5, regionsMovedFromServer0.size());
197    assertNotNull(targetServers.get(server1));
198    assertEquals(5, targetServers.get(server1).size());
199  }
200
201  @Test
202  public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception {
203    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
204    ServerName server0 = servers.get(0);
205    ServerName server1 = servers.get(1);
206    ServerName server2 = servers.get(2);
207
208    // Simulate that the regions previously hosted by server1 are now hosted on server0
209    List<RegionInfo> regionsOnServer0 = randomRegions(10);
210    List<RegionInfo> regionsOnServer1 = randomRegions(0);
211    List<RegionInfo> regionsOnServer2 = randomRegions(5);
212
213    clusterState.put(server0, regionsOnServer0);
214    clusterState.put(server1, regionsOnServer1);
215    clusterState.put(server2, regionsOnServer2);
216
217    // Mock cluster metrics
218
219    // Mock 5 regions from server0 were previously hosted on server1
220    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size());
221
222    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
223    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
224      0.0f, new ArrayList<>(), 0, 10));
225    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
226      0.0f, oldCachedRegions, 6, 10));
227    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
228      0.0f, new ArrayList<>(), 0, 10));
229    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
230    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
231    loadBalancer.updateClusterMetrics(clusterMetrics);
232
233    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
234      (Map) mockClusterServersWithTables(clusterState);
235    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
236    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
237    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
238    for (RegionPlan plan : plans) {
239      if (plan.getSource().equals(server0)) {
240        regionsMovedFromServer0.add(plan.getRegionInfo());
241        if (!targetServers.containsKey(plan.getDestination())) {
242          targetServers.put(plan.getDestination(), new ArrayList<>());
243        }
244        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
245      }
246    }
247    // should move 5 regions from server0 to server1
248    assertEquals(5, regionsMovedFromServer0.size());
249    assertEquals(5, targetServers.get(server1).size());
250    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
251  }
252
253  @Test
254  public void testThrottlingRegionBeyondThreshold() throws Exception {
255    Configuration conf = HBaseConfiguration.create();
256    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
257    conf.set("hbase.rsgroup.grouploadbalancer.class",
258      CacheAwareLoadBalancer.class.getCanonicalName());
259    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
260    balancer.setMasterServices(getMockedMaster());
261    balancer.initialize();
262
263    ServerName server0 = servers.get(0);
264    ServerName server1 = servers.get(1);
265    Pair<ServerName, Float> regionRatio = new Pair<>();
266    regionRatio.setFirst(server0);
267    regionRatio.setSecond(1.0f);
268    CacheAwareLoadBalancer internalBalancer =
269      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
270    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
271    RegionInfo mockedInfo = mock(RegionInfo.class);
272    when(mockedInfo.getEncodedName()).thenReturn("region1");
273    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
274    long startTime = EnvironmentEdgeManager.currentTime();
275    synchronized (balancer) {
276      balancer.throttle(plan);
277    }
278    long endTime = EnvironmentEdgeManager.currentTime();
279    assertTrue((endTime - startTime) < 10);
280  }
281
282  @Test
283  public void testThrottlingRegionBelowThreshold() throws Exception {
284    Configuration conf = HBaseConfiguration.create();
285    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
286    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
287    conf.set("hbase.rsgroup.grouploadbalancer.class",
288      CacheAwareLoadBalancer.class.getCanonicalName());
289    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
290    loadBalancer.setMasterServices(getMockedMaster());
291    loadBalancer.initialize();
292    CacheAwareLoadBalancer internalBalancer =
293      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
294    internalBalancer.loadConf(conf);
295
296    ServerName server0 = servers.get(0);
297    ServerName server1 = servers.get(1);
298    Pair<ServerName, Float> regionRatio = new Pair<>();
299    regionRatio.setFirst(server0);
300    regionRatio.setSecond(0.1f);
301    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
302    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
303    RegionInfo mockedInfo = mock(RegionInfo.class);
304    when(mockedInfo.getEncodedName()).thenReturn("region1");
305    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
306    long startTime = EnvironmentEdgeManager.currentTime();
307    synchronized (loadBalancer) {
308      loadBalancer.throttle(plan);
309    }
310    long endTime = EnvironmentEdgeManager.currentTime();
311    assertTrue((endTime - startTime) >= 100);
312  }
313
314  @Test
315  public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
316    Configuration conf = HBaseConfiguration.create();
317    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
318    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
319    conf.set("hbase.rsgroup.grouploadbalancer.class",
320      CacheAwareLoadBalancer.class.getCanonicalName());
321    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
322    loadBalancer.setMasterServices(getMockedMaster());
323    loadBalancer.initialize();
324    CacheAwareLoadBalancer internalBalancer =
325      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
326    internalBalancer.loadConf(conf);
327
328    ServerName server0 = servers.get(0);
329    ServerName server1 = servers.get(1);
330    ServerName server3 = servers.get(2);
331    // setting region cache ratio 100% on server 3, though this is not the target in the region plan
332    Pair<ServerName, Float> regionRatio = new Pair<>();
333    regionRatio.setFirst(server3);
334    regionRatio.setSecond(1.0f);
335    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
336    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
337    RegionInfo mockedInfo = mock(RegionInfo.class);
338    when(mockedInfo.getEncodedName()).thenReturn("region1");
339    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
340    long startTime = EnvironmentEdgeManager.currentTime();
341    synchronized (loadBalancer) {
342      loadBalancer.throttle(plan);
343    }
344    long endTime = EnvironmentEdgeManager.currentTime();
345    assertTrue((endTime - startTime) >= 100);
346  }
347
348  @Test
349  public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
350    Configuration conf = HBaseConfiguration.create();
351    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
352    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
353    conf.set("hbase.rsgroup.grouploadbalancer.class",
354      CacheAwareLoadBalancer.class.getCanonicalName());
355    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
356    loadBalancer.setMasterServices(getMockedMaster());
357    loadBalancer.initialize();
358    CacheAwareLoadBalancer internalBalancer =
359      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
360    internalBalancer.loadConf(conf);
361
362    ServerName server0 = servers.get(0);
363    ServerName server1 = servers.get(1);
364    ServerName server3 = servers.get(2);
365    // No cache ratio available for region1
366    RegionInfo mockedInfo = mock(RegionInfo.class);
367    when(mockedInfo.getEncodedName()).thenReturn("region1");
368    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
369    long startTime = EnvironmentEdgeManager.currentTime();
370    synchronized (loadBalancer) {
371      loadBalancer.throttle(plan);
372    }
373    long endTime = EnvironmentEdgeManager.currentTime();
374    assertTrue((endTime - startTime) >= 100);
375  }
376
377  @Test
378  public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
379    // The regions are fully cached on old server
380
381    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
382    ServerName server0 = servers.get(0);
383    ServerName server1 = servers.get(1);
384    ServerName server2 = servers.get(2);
385
386    // Simulate on RS with all regions, and two RSes with no regions
387    List<RegionInfo> regionsOnServer0 = randomRegions(15);
388    List<RegionInfo> regionsOnServer1 = randomRegions(0);
389    List<RegionInfo> regionsOnServer2 = randomRegions(0);
390
391    clusterState.put(server0, regionsOnServer0);
392    clusterState.put(server1, regionsOnServer1);
393    clusterState.put(server2, regionsOnServer2);
394
395    // Mock cluster metrics
396    // Mock 5 regions from server0 were previously hosted on server1
397    List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
398    List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size());
399    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
400    // mock server metrics to set cache ratio as 0 in the RS 0
401    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
402      0.0f, new ArrayList<>(), 0, 10));
403    // mock server metrics to set cache ratio as 1 in the RS 1
404    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
405      0.0f, oldCachedRegions1, 10, 10));
406    // mock server metrics to set cache ratio as .8 in the RS 2
407    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
408      0.0f, oldCachedRegions2, 8, 10));
409    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
410    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
411    loadBalancer.updateClusterMetrics(clusterMetrics);
412
413    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
414      (Map) mockClusterServersWithTables(clusterState);
415    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
416    LOG.debug("plans size: {}", plans.size());
417    LOG.debug("plans: {}", plans);
418    LOG.debug("server1 name: {}", server1.getServerName());
419    // assert the plans are in descending order from the most cached to the least cached
420    int highCacheCount = 0;
421    for (RegionPlan plan : plans) {
422      LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(),
423        plan.getDestination().getServerName());
424      if (highCacheCount < 5) {
425        LOG.debug("Count: {}", highCacheCount);
426        assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
427        assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
428        highCacheCount++;
429      } else {
430        assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
431        assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
432      }
433    }
434  }
435
436  @Test
437  public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception {
438    // The regions are fully cached on old server
439
440    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
441    ServerName server0 = servers.get(0);
442    ServerName server1 = servers.get(1);
443    ServerName server2 = servers.get(2);
444
445    // Simulate that the regions previously hosted by server1 are now hosted on server0
446    List<RegionInfo> regionsOnServer0 = randomRegions(10);
447    List<RegionInfo> regionsOnServer1 = randomRegions(0);
448    List<RegionInfo> regionsOnServer2 = randomRegions(5);
449
450    clusterState.put(server0, regionsOnServer0);
451    clusterState.put(server1, regionsOnServer1);
452    clusterState.put(server2, regionsOnServer2);
453
454    // Mock cluster metrics
455
456    // Mock 5 regions from server0 were previously hosted on server1
457    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size());
458
459    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
460    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
461      0.0f, new ArrayList<>(), 0, 10));
462    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
463      0.0f, oldCachedRegions, 10, 10));
464    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
465      0.0f, new ArrayList<>(), 0, 10));
466    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
467    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
468    loadBalancer.updateClusterMetrics(clusterMetrics);
469
470    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
471      (Map) mockClusterServersWithTables(clusterState);
472    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
473    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
474    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
475    for (RegionPlan plan : plans) {
476      if (plan.getSource().equals(server0)) {
477        regionsMovedFromServer0.add(plan.getRegionInfo());
478        if (!targetServers.containsKey(plan.getDestination())) {
479          targetServers.put(plan.getDestination(), new ArrayList<>());
480        }
481        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
482      }
483    }
484    // should move 5 regions from server0 to server1
485    assertEquals(5, regionsMovedFromServer0.size());
486    assertEquals(5, targetServers.get(server1).size());
487    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
488  }
489
490  @Test
491  public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
492    // The regions are fully cached on old server
493
494    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
495    ServerName server0 = servers.get(0);
496    ServerName server1 = servers.get(1);
497    ServerName server2 = servers.get(2);
498
499    // Simulate that the regions previously hosted by server1 are now hosted on server0
500    List<RegionInfo> regionsOnServer0 = randomRegions(10);
501    List<RegionInfo> regionsOnServer1 = randomRegions(0);
502    List<RegionInfo> regionsOnServer2 = randomRegions(5);
503
504    clusterState.put(server0, regionsOnServer0);
505    clusterState.put(server1, regionsOnServer1);
506    clusterState.put(server2, regionsOnServer2);
507
508    // Mock cluster metrics
509
510    // Mock 5 regions from server0 were previously hosted on server1
511    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size());
512
513    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
514    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
515      1.0f, new ArrayList<>(), 0, 10));
516    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
517      1.0f, oldCachedRegions, 10, 10));
518    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
519      1.0f, new ArrayList<>(), 0, 10));
520    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
521    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
522    loadBalancer.updateClusterMetrics(clusterMetrics);
523
524    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
525      (Map) mockClusterServersWithTables(clusterState);
526    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
527    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
528    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
529    for (RegionPlan plan : plans) {
530      if (plan.getSource().equals(server0)) {
531        regionsMovedFromServer0.add(plan.getRegionInfo());
532        if (!targetServers.containsKey(plan.getDestination())) {
533          targetServers.put(plan.getDestination(), new ArrayList<>());
534        }
535        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
536      }
537    }
538    // should move 5 regions from server0 to server1
539    assertEquals(5, regionsMovedFromServer0.size());
540    assertEquals(5, targetServers.get(server1).size());
541    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
542  }
543
544  @Test
545  public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception {
546    // The regions are partially cached on old server
547
548    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
549    ServerName server0 = servers.get(0);
550    ServerName server1 = servers.get(1);
551    ServerName server2 = servers.get(2);
552
553    // Simulate that the regions previously hosted by server1 are now hosted on server0
554    List<RegionInfo> regionsOnServer0 = randomRegions(10);
555    List<RegionInfo> regionsOnServer1 = randomRegions(0);
556    List<RegionInfo> regionsOnServer2 = randomRegions(5);
557
558    clusterState.put(server0, regionsOnServer0);
559    clusterState.put(server1, regionsOnServer1);
560    clusterState.put(server2, regionsOnServer2);
561
562    // Mock cluster metrics
563
564    // Mock 5 regions from server0 were previously hosted on server1
565    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size());
566
567    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
568    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
569      0.2f, new ArrayList<>(), 0, 10));
570    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
571      0.0f, oldCachedRegions, 6, 10));
572    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
573      1.0f, new ArrayList<>(), 0, 10));
574    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
575    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
576    loadBalancer.updateClusterMetrics(clusterMetrics);
577
578    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
579      (Map) mockClusterServersWithTables(clusterState);
580    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
581    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
582    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
583    for (RegionPlan plan : plans) {
584      if (plan.getSource().equals(server0)) {
585        regionsMovedFromServer0.add(plan.getRegionInfo());
586        if (!targetServers.containsKey(plan.getDestination())) {
587          targetServers.put(plan.getDestination(), new ArrayList<>());
588        }
589        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
590      }
591    }
592    assertEquals(5, regionsMovedFromServer0.size());
593    assertEquals(5, targetServers.get(server1).size());
594    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
595  }
596
597  @Timeout(60)
598  @Test
599  public void testConfigUpdateDuringBalance() throws Exception {
600    float expectedOldRatioThreshold = 0.8f;
601    float expectedNewRatioThreshold = 0.95f;
602    long throttlingTimeMillis = 10000;
603
604    conf = HBaseConfiguration.create();
605    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis);
606    conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold);
607    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
608    conf.set("hbase.rsgroup.grouploadbalancer.class",
609      CacheAwareLoadBalancer.class.getCanonicalName());
610
611    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
612    balancer.setMasterServices(getMockedMaster());
613    balancer.initialize();
614
615    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
616    ServerName server0 = servers.get(0);
617    ServerName server1 = servers.get(1);
618    ServerName server2 = servers.get(2);
619
620    // Setup cluster: all 3 regions on server0 (unbalanced)
621    List<RegionInfo> regionsOnServer0 = randomRegions(3);
622    List<RegionInfo> regionsOnServer1 = randomRegions(0);
623    List<RegionInfo> regionsOnServer2 = randomRegions(0);
624
625    clusterState.put(server0, regionsOnServer0);
626    clusterState.put(server1, regionsOnServer1);
627    clusterState.put(server2, regionsOnServer2);
628
629    // Mock metrics: NO cache info for any region = all will be throttled
630    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
631    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
632      0.0f, new ArrayList<>(), 0, 10));
633    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
634      0.0f, new ArrayList<>(), 0, 10));
635    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
636      0.0f, new ArrayList<>(), 0, 10));
637
638    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
639    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
640    balancer.updateClusterMetrics(clusterMetrics);
641
642    final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
643      (Map) mockClusterServersWithTables(clusterState);
644
645    // Verify initial configuration is set correctly
646    CacheAwareLoadBalancer internalBalancer =
647      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
648    assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
649
650    CountDownLatch balanceStarted = new CountDownLatch(1);
651    CountDownLatch configUpdateInitiated = new CountDownLatch(1);
652    long[] configUpdateDuration = new long[1];
653    long[] balanceDuration = new long[1];
654
655    // Actual old ratio threshold used during balance
656    float[] actualOldRatioThresholdDuringBalance = new float[1];
657
658    ExecutorService executor = Executors.newFixedThreadPool(2);
659
660    try {
661      // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for
662      // the duration of balance
663      Future<Long> balanceFuture = executor.submit(() -> {
664        try {
665          long start = EnvironmentEdgeManager.currentTime();
666          synchronized (balancer) {
667            try {
668              // Simulate beginning of HMaster.balance() mark balancing window open
669              balancer.onBalancingStart();
670              balanceStarted.countDown();
671
672              List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
673              if (plans != null) {
674                for (RegionPlan plan : plans) {
675                  balancer.throttle(plan);
676                }
677              }
678              // Wait until config update is initiated while balance is still in progress
679              configUpdateInitiated.await();
680
681              // Old config should still be visible during current balance run
682              CacheAwareLoadBalancer currentInternal =
683                (CacheAwareLoadBalancer) balancer.getInternalBalancer();
684              actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold;
685
686            } finally {
687              balancer.onBalancingComplete();
688            }
689          }
690          return EnvironmentEdgeManager.currentTime() - start;
691        } catch (Exception e) {
692          throw new RuntimeException(e);
693        }
694      });
695
696      // Thread 2: Simulate update_all_config / onConfigurationChange
697      Future<Long> configUpdateFuture = executor.submit(() -> {
698        try {
699          long startTime = System.currentTimeMillis();
700          // Wait for balance to start
701          balanceStarted.await();
702
703          // Call onConfigurationChange - should NOT hang
704          Configuration newConf = new Configuration(conf);
705          newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
706          newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
707          newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold);
708          balancer.onConfigurationChange(newConf);
709          configUpdateInitiated.countDown();
710
711          return System.currentTimeMillis() - startTime;
712        } catch (Exception e) {
713          throw new RuntimeException(e);
714        }
715      });
716
717      // Wait for both threads to complete
718      configUpdateDuration[0] = configUpdateFuture.get();
719      balanceDuration[0] = balanceFuture.get();
720
721      // Verify that config update didn't hang/timeout waiting for balance
722      assertTrue(configUpdateDuration[0] < balanceDuration[0]);
723
724      // Verify that ratio threshold used during balance is stll the old
725      assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f);
726
727      // Verify that config updated successfully after balance completed
728      internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer();
729      assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
730
731    } finally {
732      executor.shutdownNow();
733    }
734  }
735}