001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClusterMetrics;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.master.RegionPlan;
047import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
048import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.Pair;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category(LargeTests.class)
060public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal
061  extends RSGroupableBalancerTestBase {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
064    .forClass(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
065
066  private static final Logger LOG =
067    LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
068
069  private static RSGroupBasedLoadBalancer loadBalancer;
070
071  @BeforeClass
072  public static void beforeAllTests() throws Exception {
073    groups = new String[] { RSGroupInfo.DEFAULT_GROUP };
074    servers = generateServers(3);
075    groupMap = constructGroupInfo(servers, groups);
076    tableDescs = constructTableDesc(false);
077    Configuration cong = HBaseConfiguration.create();
078    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
079    conf.set("hbase.rsgroup.grouploadbalancer.class",
080      CacheAwareLoadBalancer.class.getCanonicalName());
081    loadBalancer = new RSGroupBasedLoadBalancer();
082    loadBalancer.setMasterServices(getMockedMaster());
083    loadBalancer.initialize();
084  }
085
086  @Test
087  public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception {
088    // The regions are not cached on old server as well as the current server. This causes
089    // skewness in the region allocation which should be fixed by the balancer
090
091    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
092    ServerName server0 = servers.get(0);
093    ServerName server1 = servers.get(1);
094    ServerName server2 = servers.get(2);
095
096    // Simulate that the regions previously hosted by server1 are now hosted on server0
097    List<RegionInfo> regionsOnServer0 = randomRegions(10);
098    List<RegionInfo> regionsOnServer1 = randomRegions(0);
099    List<RegionInfo> regionsOnServer2 = randomRegions(5);
100
101    clusterState.put(server0, regionsOnServer0);
102    clusterState.put(server1, regionsOnServer1);
103    clusterState.put(server2, regionsOnServer2);
104
105    // Mock cluster metrics
106    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
107    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
108      0.0f, new ArrayList<>(), 0, 10));
109    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
110      0.0f, new ArrayList<>(), 0, 10));
111    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
112      0.0f, new ArrayList<>(), 0, 10));
113    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
114    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
115    loadBalancer.updateClusterMetrics(clusterMetrics);
116
117    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
118      (Map) mockClusterServersWithTables(clusterState);
119    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
120    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
121    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
122    for (RegionPlan plan : plans) {
123      if (plan.getSource().equals(server0)) {
124        regionsMovedFromServer0.add(plan.getRegionInfo());
125        if (!targetServers.containsKey(plan.getDestination())) {
126          targetServers.put(plan.getDestination(), new ArrayList<>());
127        }
128        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
129      }
130    }
131    // should move 5 regions from server0 to server 1
132    assertEquals(5, regionsMovedFromServer0.size());
133    assertEquals(5, targetServers.get(server1).size());
134  }
135
136  @Test
137  public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception {
138    // The regions are partially cached on old server but not cached on the current server
139
140    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
141    ServerName server0 = servers.get(0);
142    ServerName server1 = servers.get(1);
143    ServerName server2 = servers.get(2);
144
145    // Simulate that the regions previously hosted by server1 are now hosted on server0
146    List<RegionInfo> regionsOnServer0 = randomRegions(10);
147    List<RegionInfo> regionsOnServer1 = randomRegions(0);
148    List<RegionInfo> regionsOnServer2 = randomRegions(5);
149
150    clusterState.put(server0, regionsOnServer0);
151    clusterState.put(server1, regionsOnServer1);
152    clusterState.put(server2, regionsOnServer2);
153
154    // Mock cluster metrics
155
156    // Mock 5 regions from server0 were previously hosted on server1
157    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
158
159    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
160    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
161      0.0f, new ArrayList<>(), 0, 10));
162    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
163      0.0f, oldCachedRegions, 6, 10));
164    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
165      0.0f, new ArrayList<>(), 0, 10));
166    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
167    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
168    loadBalancer.updateClusterMetrics(clusterMetrics);
169
170    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
171      (Map) mockClusterServersWithTables(clusterState);
172    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
173    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
174    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
175    for (RegionPlan plan : plans) {
176      if (plan.getSource().equals(server0)) {
177        regionsMovedFromServer0.add(plan.getRegionInfo());
178        if (!targetServers.containsKey(plan.getDestination())) {
179          targetServers.put(plan.getDestination(), new ArrayList<>());
180        }
181        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
182      }
183    }
184    // should move 5 regions from server0 to server1
185    assertEquals(5, regionsMovedFromServer0.size());
186    assertEquals(5, targetServers.get(server1).size());
187    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
188  }
189
190  @Test
191  public void testThrottlingRegionBeyondThreshold() throws Exception {
192    Configuration conf = HBaseConfiguration.create();
193    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
194    conf.set("hbase.rsgroup.grouploadbalancer.class",
195      CacheAwareLoadBalancer.class.getCanonicalName());
196    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
197    balancer.setMasterServices(getMockedMaster());
198    balancer.initialize();
199
200    ServerName server0 = servers.get(0);
201    ServerName server1 = servers.get(1);
202    Pair<ServerName, Float> regionRatio = new Pair<>();
203    regionRatio.setFirst(server0);
204    regionRatio.setSecond(1.0f);
205    CacheAwareLoadBalancer internalBalancer =
206      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
207    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
208    RegionInfo mockedInfo = mock(RegionInfo.class);
209    when(mockedInfo.getEncodedName()).thenReturn("region1");
210    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
211    long startTime = EnvironmentEdgeManager.currentTime();
212    synchronized (balancer) {
213      balancer.throttle(plan);
214    }
215    long endTime = EnvironmentEdgeManager.currentTime();
216    assertTrue((endTime - startTime) < 10);
217  }
218
219  @Test
220  public void testThrottlingRegionBelowThreshold() throws Exception {
221    Configuration conf = HBaseConfiguration.create();
222    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
223    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
224    conf.set("hbase.rsgroup.grouploadbalancer.class",
225      CacheAwareLoadBalancer.class.getCanonicalName());
226    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
227    loadBalancer.setMasterServices(getMockedMaster());
228    loadBalancer.initialize();
229    CacheAwareLoadBalancer internalBalancer =
230      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
231    internalBalancer.loadConf(conf);
232
233    ServerName server0 = servers.get(0);
234    ServerName server1 = servers.get(1);
235    Pair<ServerName, Float> regionRatio = new Pair<>();
236    regionRatio.setFirst(server0);
237    regionRatio.setSecond(0.1f);
238    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
239    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
240    RegionInfo mockedInfo = mock(RegionInfo.class);
241    when(mockedInfo.getEncodedName()).thenReturn("region1");
242    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
243    long startTime = EnvironmentEdgeManager.currentTime();
244    synchronized (loadBalancer) {
245      loadBalancer.throttle(plan);
246    }
247    long endTime = EnvironmentEdgeManager.currentTime();
248    assertTrue((endTime - startTime) >= 100);
249  }
250
251  @Test
252  public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
253    Configuration conf = HBaseConfiguration.create();
254    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
255    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
256    conf.set("hbase.rsgroup.grouploadbalancer.class",
257      CacheAwareLoadBalancer.class.getCanonicalName());
258    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
259    loadBalancer.setMasterServices(getMockedMaster());
260    loadBalancer.initialize();
261    CacheAwareLoadBalancer internalBalancer =
262      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
263    internalBalancer.loadConf(conf);
264
265    ServerName server0 = servers.get(0);
266    ServerName server1 = servers.get(1);
267    ServerName server3 = servers.get(2);
268    // setting region cache ratio 100% on server 3, though this is not the target in the region plan
269    Pair<ServerName, Float> regionRatio = new Pair<>();
270    regionRatio.setFirst(server3);
271    regionRatio.setSecond(1.0f);
272    internalBalancer = (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
273    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
274    RegionInfo mockedInfo = mock(RegionInfo.class);
275    when(mockedInfo.getEncodedName()).thenReturn("region1");
276    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
277    long startTime = EnvironmentEdgeManager.currentTime();
278    synchronized (loadBalancer) {
279      loadBalancer.throttle(plan);
280    }
281    long endTime = EnvironmentEdgeManager.currentTime();
282    assertTrue((endTime - startTime) >= 100);
283  }
284
285  @Test
286  public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
287    Configuration conf = HBaseConfiguration.create();
288    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
289    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
290    conf.set("hbase.rsgroup.grouploadbalancer.class",
291      CacheAwareLoadBalancer.class.getCanonicalName());
292    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
293    loadBalancer.setMasterServices(getMockedMaster());
294    loadBalancer.initialize();
295    CacheAwareLoadBalancer internalBalancer =
296      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
297    internalBalancer.loadConf(conf);
298
299    ServerName server0 = servers.get(0);
300    ServerName server1 = servers.get(1);
301    ServerName server3 = servers.get(2);
302    // No cache ratio available for region1
303    RegionInfo mockedInfo = mock(RegionInfo.class);
304    when(mockedInfo.getEncodedName()).thenReturn("region1");
305    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
306    long startTime = EnvironmentEdgeManager.currentTime();
307    synchronized (loadBalancer) {
308      loadBalancer.throttle(plan);
309    }
310    long endTime = EnvironmentEdgeManager.currentTime();
311    assertTrue((endTime - startTime) >= 100);
312  }
313
314  @Test
315  public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
316    // The regions are fully cached on old server
317
318    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
319    ServerName server0 = servers.get(0);
320    ServerName server1 = servers.get(1);
321    ServerName server2 = servers.get(2);
322
323    // Simulate on RS with all regions, and two RSes with no regions
324    List<RegionInfo> regionsOnServer0 = randomRegions(15);
325    List<RegionInfo> regionsOnServer1 = randomRegions(0);
326    List<RegionInfo> regionsOnServer2 = randomRegions(0);
327
328    clusterState.put(server0, regionsOnServer0);
329    clusterState.put(server1, regionsOnServer1);
330    clusterState.put(server2, regionsOnServer2);
331
332    // Mock cluster metrics
333    // Mock 5 regions from server0 were previously hosted on server1
334    List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
335    List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, regionsOnServer0.size());
336    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
337    // mock server metrics to set cache ratio as 0 in the RS 0
338    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
339      0.0f, new ArrayList<>(), 0, 10));
340    // mock server metrics to set cache ratio as 1 in the RS 1
341    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
342      0.0f, oldCachedRegions1, 10, 10));
343    // mock server metrics to set cache ratio as .8 in the RS 2
344    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
345      0.0f, oldCachedRegions2, 8, 10));
346    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
347    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
348    loadBalancer.updateClusterMetrics(clusterMetrics);
349
350    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
351      (Map) mockClusterServersWithTables(clusterState);
352    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
353    LOG.debug("plans size: {}", plans.size());
354    LOG.debug("plans: {}", plans);
355    LOG.debug("server1 name: {}", server1.getServerName());
356    // assert the plans are in descending order from the most cached to the least cached
357    int highCacheCount = 0;
358    for (RegionPlan plan : plans) {
359      LOG.debug("plan region: {}, target server: {}", plan.getRegionInfo().getEncodedName(),
360        plan.getDestination().getServerName());
361      if (highCacheCount < 5) {
362        LOG.debug("Count: {}", highCacheCount);
363        assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
364        assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
365        highCacheCount++;
366      } else {
367        assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
368        assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
369      }
370    }
371  }
372
373  @Test
374  public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception {
375    // The regions are fully cached on old server
376
377    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
378    ServerName server0 = servers.get(0);
379    ServerName server1 = servers.get(1);
380    ServerName server2 = servers.get(2);
381
382    // Simulate that the regions previously hosted by server1 are now hosted on server0
383    List<RegionInfo> regionsOnServer0 = randomRegions(10);
384    List<RegionInfo> regionsOnServer1 = randomRegions(0);
385    List<RegionInfo> regionsOnServer2 = randomRegions(5);
386
387    clusterState.put(server0, regionsOnServer0);
388    clusterState.put(server1, regionsOnServer1);
389    clusterState.put(server2, regionsOnServer2);
390
391    // Mock cluster metrics
392
393    // Mock 5 regions from server0 were previously hosted on server1
394    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
395
396    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
397    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
398      0.0f, new ArrayList<>(), 0, 10));
399    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
400      0.0f, oldCachedRegions, 10, 10));
401    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
402      0.0f, new ArrayList<>(), 0, 10));
403    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
404    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
405    loadBalancer.updateClusterMetrics(clusterMetrics);
406
407    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
408      (Map) mockClusterServersWithTables(clusterState);
409    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
410    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
411    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
412    for (RegionPlan plan : plans) {
413      if (plan.getSource().equals(server0)) {
414        regionsMovedFromServer0.add(plan.getRegionInfo());
415        if (!targetServers.containsKey(plan.getDestination())) {
416          targetServers.put(plan.getDestination(), new ArrayList<>());
417        }
418        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
419      }
420    }
421    // should move 5 regions from server0 to server1
422    assertEquals(5, regionsMovedFromServer0.size());
423    assertEquals(5, targetServers.get(server1).size());
424    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
425  }
426
427  @Test
428  public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
429    // The regions are fully cached on old server
430
431    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
432    ServerName server0 = servers.get(0);
433    ServerName server1 = servers.get(1);
434    ServerName server2 = servers.get(2);
435
436    // Simulate that the regions previously hosted by server1 are now hosted on server0
437    List<RegionInfo> regionsOnServer0 = randomRegions(10);
438    List<RegionInfo> regionsOnServer1 = randomRegions(0);
439    List<RegionInfo> regionsOnServer2 = randomRegions(5);
440
441    clusterState.put(server0, regionsOnServer0);
442    clusterState.put(server1, regionsOnServer1);
443    clusterState.put(server2, regionsOnServer2);
444
445    // Mock cluster metrics
446
447    // Mock 5 regions from server0 were previously hosted on server1
448    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
449
450    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
451    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
452      1.0f, new ArrayList<>(), 0, 10));
453    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
454      1.0f, oldCachedRegions, 10, 10));
455    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
456      1.0f, new ArrayList<>(), 0, 10));
457    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
458    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
459    loadBalancer.updateClusterMetrics(clusterMetrics);
460
461    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
462      (Map) mockClusterServersWithTables(clusterState);
463    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
464    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
465    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
466    for (RegionPlan plan : plans) {
467      if (plan.getSource().equals(server0)) {
468        regionsMovedFromServer0.add(plan.getRegionInfo());
469        if (!targetServers.containsKey(plan.getDestination())) {
470          targetServers.put(plan.getDestination(), new ArrayList<>());
471        }
472        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
473      }
474    }
475    // should move 5 regions from server0 to server1
476    assertEquals(5, regionsMovedFromServer0.size());
477    assertEquals(5, targetServers.get(server1).size());
478    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
479  }
480
481  @Test
482  public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception {
483    // The regions are partially cached on old server
484
485    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
486    ServerName server0 = servers.get(0);
487    ServerName server1 = servers.get(1);
488    ServerName server2 = servers.get(2);
489
490    // Simulate that the regions previously hosted by server1 are now hosted on server0
491    List<RegionInfo> regionsOnServer0 = randomRegions(10);
492    List<RegionInfo> regionsOnServer1 = randomRegions(0);
493    List<RegionInfo> regionsOnServer2 = randomRegions(5);
494
495    clusterState.put(server0, regionsOnServer0);
496    clusterState.put(server1, regionsOnServer1);
497    clusterState.put(server2, regionsOnServer2);
498
499    // Mock cluster metrics
500
501    // Mock 5 regions from server0 were previously hosted on server1
502    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
503
504    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
505    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
506      0.2f, new ArrayList<>(), 0, 10));
507    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
508      0.0f, oldCachedRegions, 6, 10));
509    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
510      1.0f, new ArrayList<>(), 0, 10));
511    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
512    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
513    loadBalancer.updateClusterMetrics(clusterMetrics);
514
515    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
516      (Map) mockClusterServersWithTables(clusterState);
517    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
518    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
519    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
520    for (RegionPlan plan : plans) {
521      if (plan.getSource().equals(server0)) {
522        regionsMovedFromServer0.add(plan.getRegionInfo());
523        if (!targetServers.containsKey(plan.getDestination())) {
524          targetServers.put(plan.getDestination(), new ArrayList<>());
525        }
526        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
527      }
528    }
529    assertEquals(5, regionsMovedFromServer0.size());
530    assertEquals(5, targetServers.get(server1).size());
531    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
532  }
533
534  @Test(timeout = 60000)
535  public void testConfigUpdateDuringBalance() throws Exception {
536    float expectedOldRatioThreshold = 0.8f;
537    float expectedNewRatioThreshold = 0.95f;
538    long throttlingTimeMillis = 10000;
539
540    conf = HBaseConfiguration.create();
541    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis);
542    conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold);
543    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
544    conf.set("hbase.rsgroup.grouploadbalancer.class",
545      CacheAwareLoadBalancer.class.getCanonicalName());
546
547    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
548    balancer.setMasterServices(getMockedMaster());
549    balancer.initialize();
550
551    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
552    ServerName server0 = servers.get(0);
553    ServerName server1 = servers.get(1);
554    ServerName server2 = servers.get(2);
555
556    // Setup cluster: all 3 regions on server0 (unbalanced)
557    List<RegionInfo> regionsOnServer0 = randomRegions(3);
558    List<RegionInfo> regionsOnServer1 = randomRegions(0);
559    List<RegionInfo> regionsOnServer2 = randomRegions(0);
560
561    clusterState.put(server0, regionsOnServer0);
562    clusterState.put(server1, regionsOnServer1);
563    clusterState.put(server2, regionsOnServer2);
564
565    // Mock metrics: NO cache info for any region = all will be throttled
566    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
567    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
568      0.0f, new ArrayList<>(), 0, 10));
569    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
570      0.0f, new ArrayList<>(), 0, 10));
571    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
572      0.0f, new ArrayList<>(), 0, 10));
573
574    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
575    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
576    balancer.updateClusterMetrics(clusterMetrics);
577
578    final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
579      (Map) mockClusterServersWithTables(clusterState);
580
581    // Verify initial configuration is set correctly
582    CacheAwareLoadBalancer internalBalancer =
583      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
584    assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
585
586    CountDownLatch balanceStarted = new CountDownLatch(1);
587    CountDownLatch configUpdateInitiated = new CountDownLatch(1);
588    long[] configUpdateDuration = new long[1];
589    long[] balanceDuration = new long[1];
590
591    // Actual old ratio threshold used during balance
592    float[] actualOldRatioThresholdDuringBalance = new float[1];
593
594    ExecutorService executor = Executors.newFixedThreadPool(2);
595
596    try {
597      // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for
598      // the duration of balance
599      Future<Long> balanceFuture = executor.submit(() -> {
600        try {
601          long start = EnvironmentEdgeManager.currentTime();
602          synchronized (balancer) {
603            try {
604              // Simulate beginning of HMaster.balance() mark balancing window open
605              balancer.onBalancingStart();
606              balanceStarted.countDown();
607
608              List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
609              if (plans != null) {
610                for (RegionPlan plan : plans) {
611                  balancer.throttle(plan);
612                }
613              }
614              // Wait until config update is initiated while balance is still in progress
615              configUpdateInitiated.await();
616
617              // Old config should still be visible during current balance run
618              CacheAwareLoadBalancer currentInternal =
619                (CacheAwareLoadBalancer) balancer.getInternalBalancer();
620              actualOldRatioThresholdDuringBalance[0] = currentInternal.ratioThreshold;
621
622            } finally {
623              balancer.onBalancingComplete();
624            }
625          }
626          return EnvironmentEdgeManager.currentTime() - start;
627        } catch (Exception e) {
628          throw new RuntimeException(e);
629        }
630      });
631
632      // Thread 2: Simulate update_all_config / onConfigurationChange
633      Future<Long> configUpdateFuture = executor.submit(() -> {
634        try {
635          long startTime = System.currentTimeMillis();
636          // Wait for balance to start
637          balanceStarted.await();
638
639          // Call onConfigurationChange - should NOT hang
640          Configuration newConf = new Configuration(conf);
641          newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
642          newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
643          newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold);
644          balancer.onConfigurationChange(newConf);
645          configUpdateInitiated.countDown();
646
647          return System.currentTimeMillis() - startTime;
648        } catch (Exception e) {
649          throw new RuntimeException(e);
650        }
651      });
652
653      // Wait for both threads to complete
654      configUpdateDuration[0] = configUpdateFuture.get();
655      balanceDuration[0] = balanceFuture.get();
656
657      // Verify that config update didn't hang/timeout waiting for balance
658      assertTrue(configUpdateDuration[0] < balanceDuration[0]);
659
660      // Verify that ratio threshold used during balance is stll the old
661      assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f);
662
663      // Verify that config updated successfully after balance completed
664      internalBalancer = (CacheAwareLoadBalancer) balancer.getInternalBalancer();
665      assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 0.001f);
666
667    } finally {
668      executor.shutdownNow();
669    }
670  }
671}