001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY;
021import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.Base64;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.function.Function;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.master.RegionPlan;
038import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042public final class CandidateGeneratorTestUtil {
043
044  private static final Logger LOG = LoggerFactory.getLogger(CandidateGeneratorTestUtil.class);
045
046  private CandidateGeneratorTestUtil() {
047  }
048
049  enum ExhaustionType {
050    COST_GOAL_ACHIEVED,
051    NO_MORE_MOVES;
052  }
053
054  static void runBalancerToExhaustion(Configuration conf,
055    Map<ServerName, List<RegionInfo>> serverToRegions,
056    Set<Function<BalancerClusterState, Boolean>> expectations, float targetMaxBalancerCost) {
057    runBalancerToExhaustion(conf, serverToRegions, expectations, targetMaxBalancerCost, 15000,
058      ExhaustionType.COST_GOAL_ACHIEVED);
059  }
060
061  static void runBalancerToExhaustion(Configuration conf,
062    Map<ServerName, List<RegionInfo>> serverToRegions,
063    Set<Function<BalancerClusterState, Boolean>> expectations, float targetMaxBalancerCost,
064    long maxRunningTime, ExhaustionType exhaustionType) {
065    // Do the full plan. We're testing with a lot of regions
066    conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
067    conf.setLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
068
069    conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost);
070
071    BalancerClusterState cluster = createMockBalancerClusterState(serverToRegions);
072    StochasticLoadBalancer stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf);
073    printClusterDistribution(cluster, 0);
074    int balancerRuns = 0;
075    int actionsTaken = 0;
076    long balancingMillis = 0;
077    boolean isBalanced = false;
078    while (!isBalanced) {
079      balancerRuns++;
080      if (balancerRuns > 10) {
081        throw new RuntimeException("Balancer failed to find balance & meet expectations");
082      }
083      long start = System.currentTimeMillis();
084      List<RegionPlan> regionPlans =
085        stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions));
086      balancingMillis += System.currentTimeMillis() - start;
087      actionsTaken++;
088      if (regionPlans != null) {
089        // Apply all plans to serverToRegions
090        for (RegionPlan rp : regionPlans) {
091          ServerName source = rp.getSource();
092          ServerName dest = rp.getDestination();
093          RegionInfo region = rp.getRegionInfo();
094
095          // Update serverToRegions
096          serverToRegions.get(source).remove(region);
097          serverToRegions.get(dest).add(region);
098          actionsTaken++;
099        }
100
101        // Now rebuild cluster and balancer from updated serverToRegions
102        cluster = createMockBalancerClusterState(serverToRegions);
103        stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf);
104      }
105      printClusterDistribution(cluster, actionsTaken);
106      isBalanced = true;
107      for (Function<BalancerClusterState, Boolean> condition : expectations) {
108        // Check if we've met all expectations for the candidate generator
109        if (!condition.apply(cluster)) {
110          isBalanced = false;
111          break;
112        }
113      }
114      if (isBalanced) { // Check if the balancer thinks we're done too
115        if (exhaustionType == ExhaustionType.COST_GOAL_ACHIEVED) {
116          // If we expect to achieve the cost goal, then needsBalance should be false
117          if (stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) {
118            LOG.info("Balancer cost goal is not achieved. needsBalance=true");
119            isBalanced = false;
120          }
121        } else {
122          // If we anticipate running out of moves, then our last balance run should have produced
123          // nothing
124          if (regionPlans != null && !regionPlans.isEmpty()) {
125            LOG.info("Balancer is not out of moves. regionPlans.size()={}", regionPlans.size());
126            isBalanced = false;
127          }
128        }
129      }
130    }
131    LOG.info("Balancer is done. Balancing took {}sec",
132      Duration.ofMillis(balancingMillis).toMinutes());
133  }
134
135  /**
136   * Prints the current cluster distribution of regions per table per server
137   */
138  static void printClusterDistribution(BalancerClusterState cluster, long actionsTaken) {
139    LOG.info("=== Cluster Distribution after {} balancer actions taken ===", actionsTaken);
140
141    for (int i = 0; i < cluster.numServers; i++) {
142      int[] regions = cluster.regionsPerServer[i];
143      int regionCount = (regions == null) ? 0 : regions.length;
144
145      LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(), regionCount);
146
147      if (regionCount > 0) {
148        Map<TableName, Integer> tableRegionCounts = new HashMap<>();
149
150        for (int regionIndex : regions) {
151          RegionInfo regionInfo = cluster.regions[regionIndex];
152          TableName tableName = regionInfo.getTable();
153          tableRegionCounts.put(tableName, tableRegionCounts.getOrDefault(tableName, 0) + 1);
154        }
155
156        tableRegionCounts
157          .forEach((table, count) -> LOG.info("  - Table {}: {} regions", table, count));
158      }
159    }
160
161    LOG.info("===========================================");
162  }
163
164  /**
165   * Partitions the given serverToRegions map by table The tables are derived from the RegionInfo
166   * objects found in serverToRegions.
167   * @param serverToRegions The map of servers to their assigned regions.
168   * @return A map of tables to their server-to-region assignments.
169   */
170  public static Map<TableName, Map<ServerName, List<RegionInfo>>>
171    partitionRegionsByTable(Map<ServerName, List<RegionInfo>> serverToRegions) {
172
173    // First, gather all tables from the regions
174    Set<TableName> allTables = new HashSet<>();
175    for (List<RegionInfo> regions : serverToRegions.values()) {
176      for (RegionInfo region : regions) {
177        allTables.add(region.getTable());
178      }
179    }
180
181    Map<TableName, Map<ServerName, List<RegionInfo>>> tablesToServersToRegions = new HashMap<>();
182
183    // Initialize each table with all servers mapped to empty lists
184    for (TableName table : allTables) {
185      Map<ServerName, List<RegionInfo>> serverMap = new HashMap<>();
186      for (ServerName server : serverToRegions.keySet()) {
187        serverMap.put(server, new ArrayList<>());
188      }
189      tablesToServersToRegions.put(table, serverMap);
190    }
191
192    // Distribute regions to their respective tables
193    for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) {
194      ServerName server = serverAndRegions.getKey();
195      List<RegionInfo> regions = serverAndRegions.getValue();
196
197      for (RegionInfo region : regions) {
198        TableName regionTable = region.getTable();
199        // Now we know for sure regionTable is in allTables
200        Map<ServerName, List<RegionInfo>> tableServerMap =
201          tablesToServersToRegions.get(regionTable);
202        tableServerMap.get(server).add(region);
203      }
204    }
205
206    return tablesToServersToRegions;
207  }
208
209  static StochasticLoadBalancer buildStochasticLoadBalancer(BalancerClusterState cluster,
210    Configuration conf) {
211    StochasticLoadBalancer stochasticLoadBalancer =
212      new StochasticLoadBalancer(new DummyMetricsStochasticBalancer());
213    stochasticLoadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
214    stochasticLoadBalancer.loadConf(conf);
215    stochasticLoadBalancer.initCosts(cluster);
216    return stochasticLoadBalancer;
217  }
218
219  static BalancerClusterState
220    createMockBalancerClusterState(Map<ServerName, List<RegionInfo>> serverToRegions) {
221    return new BalancerClusterState(serverToRegions, null, null, null, null);
222  }
223
224  /**
225   * Validates that each replica is isolated from its others. Ensures that no server hosts more than
226   * one replica of the same region (i.e., regions with identical start and end keys).
227   * @param cluster The current state of the cluster.
228   * @return true if all replicas are properly isolated, false otherwise.
229   */
230  static boolean areAllReplicasDistributed(BalancerClusterState cluster) {
231    // Iterate over each server
232    for (int[] regionsPerServer : cluster.regionsPerServer) {
233      if (regionsPerServer == null || regionsPerServer.length == 0) {
234        continue; // Skip empty servers
235      }
236
237      Set<ReplicaKey> foundKeys = new HashSet<>();
238      for (int regionIndex : regionsPerServer) {
239        RegionInfo regionInfo = cluster.regions[regionIndex];
240        ReplicaKey replicaKey = new ReplicaKey(regionInfo);
241        if (foundKeys.contains(replicaKey)) {
242          // Violation: Multiple replicas of the same region on the same server
243          LOG.warn("Replica isolation violated: one server hosts multiple replicas of key [{}].",
244            generateRegionKey(regionInfo));
245          return false;
246        }
247
248        foundKeys.add(replicaKey);
249      }
250    }
251
252    LOG.info(
253      "Replica isolation validation passed: No server hosts multiple replicas of the same region.");
254    return true;
255  }
256
257  /**
258   * Generic method to validate table isolation.
259   */
260  static boolean isTableIsolated(BalancerClusterState cluster, TableName tableName,
261    String tableType) {
262    for (int i = 0; i < cluster.numServers; i++) {
263      int[] regionsOnServer = cluster.regionsPerServer[i];
264      if (regionsOnServer == null || regionsOnServer.length == 0) {
265        continue; // Skip empty servers
266      }
267
268      boolean hasTargetTableRegion = false;
269      boolean hasOtherTableRegion = false;
270
271      for (int regionIndex : regionsOnServer) {
272        RegionInfo regionInfo = cluster.regions[regionIndex];
273        if (regionInfo.getTable().equals(tableName)) {
274          hasTargetTableRegion = true;
275        } else {
276          hasOtherTableRegion = true;
277        }
278
279        // If the target table and any other table are on the same server, isolation is violated
280        if (hasTargetTableRegion && hasOtherTableRegion) {
281          LOG.debug(
282            "Server {} has both {} table regions and other table regions, violating isolation.",
283            cluster.servers[i].getServerName(), tableType);
284          return false;
285        }
286      }
287    }
288    LOG.debug("{} table isolation validation passed.", tableType);
289    return true;
290  }
291
292  /**
293   * Generates a unique key for a region based on its start and end keys. This method ensures that
294   * regions with identical start and end keys have the same key.
295   * @param regionInfo The RegionInfo object.
296   * @return A string representing the unique key of the region.
297   */
298  private static String generateRegionKey(RegionInfo regionInfo) {
299    // Using Base64 encoding for byte arrays to ensure uniqueness and readability
300    String startKey = Base64.getEncoder().encodeToString(regionInfo.getStartKey());
301    String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey());
302
303    return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" + endKey;
304  }
305
306}