001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache is minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.math.BigDecimal;
032import java.util.ArrayDeque;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Deque;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import java.util.concurrent.ThreadLocalRandom;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.RegionMetrics;
044import org.apache.hadoop.hbase.ServerMetrics;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.Size;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.master.RackManager;
050import org.apache.hadoop.hbase.master.RegionPlan;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056@InterfaceAudience.Private
057public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
058  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
059
060  public static final String CACHE_RATIO_THRESHOLD =
061    "hbase.master.balancer.stochastic.throttling.cacheRatio";
062  public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f;
063
064  /**
065   * Below this cache ratio on the current host, a move may be considered for the free-space
066   * heuristic.
067   */
068  public static final String LOW_CACHE_RATIO_FOR_RELOCATION_KEY =
069    "hbase.master.balancer.cacheaware.lowCacheRatioThreshold";
070  public static final float LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT = 0.35f;
071
072  /**
073   * Optimistic region cache ratio assumed for cost purposes when a better host has free cache space
074   * (actual warmup is not modeled).
075   */
076  public static final String POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY =
077    "hbase.master.balancer.cacheaware.potentialCacheRatioAfterMove";
078  public static final float POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT = 0.95f;
079
080  /**
081   * Minimum free block cache on a target server, as a multiple of the region's on-disk size in
082   * bytes, required to count that server as a relocation opportunity.
083   */
084  public static final String MIN_FREE_CACHE_SPACE_FACTOR_KEY =
085    "hbase.master.balancer.cacheaware.minFreeCacheSpaceFactor";
086  public static final float MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT = 1.0f;
087
088  public Float ratioThreshold;
089
090  private Long sleepTime;
091  private Configuration configuration;
092
093  private float lowCacheRatioThreshold;
094  private float potentialCacheRatioAfterMove;
095  private float minFreeCacheSpaceFactor;
096
097  private BigDecimal simulatedRatio = BigDecimal.ZERO;
098
099  @Override
100  public void loadConf(Configuration configuration) {
101    this.configuration = configuration;
102    this.costFunctions = new ArrayList<>();
103    super.loadConf(configuration);
104    ratioThreshold =
105      this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
106    sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
107    lowCacheRatioThreshold = configuration.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY,
108      LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT);
109    potentialCacheRatioAfterMove = configuration.getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY,
110      POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT);
111    minFreeCacheSpaceFactor =
112      configuration.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT);
113  }
114
115  @Override
116  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
117    createCandidateGenerators(Configuration conf) {
118    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
119      new HashMap<>(2);
120    candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
121      new CacheAwareSkewnessCandidateGenerator());
122    candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
123    return candidateGenerators;
124  }
125
126  @Override
127  protected List<CostFunction> createCostFunctions(Configuration configuration) {
128    List<CostFunction> costFunctions = new ArrayList<>();
129    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
130    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
131    return costFunctions;
132  }
133
134  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
135    if (costFunction.getMultiplier() > 0) {
136      costFunctions.add(costFunction);
137    }
138  }
139
140  @Override
141  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
142    this.clusterStatus = clusterMetrics;
143    updateRegionLoad();
144  }
145
146  protected Map<ServerName, Long> getServerBlockCacheFreeBytes() {
147    if (clusterStatus == null) {
148      return null;
149    }
150    Map<ServerName, Long> map = new HashMap<>();
151    clusterStatus.getLiveServerMetrics().forEach((sn, sm) -> map.put(sn, sm.getCacheFreeSize()));
152    return map;
153  }
154
155  @Override
156  protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState,
157    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder,
158    RackManager rackManager) {
159    return new BalancerClusterState(clusterState, loads, finder, rackManager,
160      regionCacheRatioOnOldServerMap, getServerBlockCacheFreeBytes());
161  }
162
163  /**
164   * Collect the amount of region cached for all the regions from all the active region servers.
165   */
166  private void updateRegionLoad() {
167    loads = new HashMap<>();
168    regionCacheRatioOnOldServerMap = new HashMap<>();
169    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
170
171    // Build current region cache statistics
172    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
173      // Create a map of region and the server where it is currently hosted
174      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
175        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
176
177        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
178
179        // Get the total size of the hFiles in this region
180        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
181
182        rload.add(new BalancerRegionLoad(rm));
183        // Maintain a map of region and its total size. This is needed to calculate the cache
184        // ratios for the regions cached on old region servers
185        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
186        loads.put(regionEncodedName, rload);
187      });
188    });
189
190    // Build cache statistics for the regions hosted previously on old region servers
191    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
192      // Find if a region was previously hosted on a server other than the one it is currently
193      // hosted on.
194      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
195        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
196        // this server
197        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
198          ServerName currentServer =
199            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
200          if (!ServerName.isSameAddress(currentServer, sn)) {
201            int regionSizeMB =
202              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
203            // The coldDataSize accounts for data size classified as "cold" by DataTieringManager,
204            // which should be kept out of cache. We calculate cache ratio on old server based
205            // only on the hot data size for the region (regionSizeMB - coldDataSize), as we
206            // don't want to move regions with low cache ratio due to data classified as cold.
207            int coldDataSize = sm.getRegionColdDataSize().getOrDefault(regionEncodedName, 0);
208            float regionCacheRatioOnOldServer = (regionSizeMB - coldDataSize) <= 0
209              ? 0.0f
210              : (float) regionSizeInCache / (regionSizeMB - coldDataSize);
211            regionCacheRatioOnOldServerMap.put(regionEncodedName,
212              new Pair<>(sn, regionCacheRatioOnOldServer));
213          }
214        }
215      });
216    });
217  }
218
219  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
220    Optional<RegionInfo> regionInfoOptional =
221      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
222        return regionName.equals(ri.getEncodedName());
223      }).findFirst();
224
225    if (regionInfoOptional.isPresent()) {
226      return regionInfoOptional.get();
227    }
228    return null;
229  }
230
231  @Override
232  public long getThrottleDurationMs(RegionPlan plan) {
233    Pair<ServerName, Float> rsRatio = this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
234    if (
235      rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
236        && rsRatio.getSecond() >= ratioThreshold
237    ) {
238      LOG.debug("Moving region {} to server {} with cache ratio {}. No throttling needed.",
239        plan.getRegionInfo().getEncodedName(), plan.getDestination(), rsRatio.getSecond());
240      return 0L;
241    } else {
242      if (rsRatio != null) {
243        LOG.debug("Moving region {} to server {} with cache ratio: {}. Throttling move for {}ms.",
244          plan.getRegionInfo().getEncodedName(), plan.getDestination(),
245          plan.getDestination().equals(rsRatio.getFirst()) ? rsRatio.getSecond() : "unknown",
246          sleepTime);
247      } else {
248        LOG.debug(
249          "Moving region {} to server {} with no cache ratio info for the region. "
250            + "Throttling move for {}ms.",
251          plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime);
252      }
253      return sleepTime;
254    }
255  }
256
257  @Override
258  protected List<RegionPlan> balanceTable(TableName tableName,
259    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
260    final Map<String, Pair<ServerName, Float>> snapshot = new HashMap<>();
261    snapshot.putAll(this.regionCacheRatioOnOldServerMap);
262    List<RegionPlan> plans = super.balanceTable(tableName, loadOfOneTable);
263    if (plans == null) {
264      return plans;
265    }
266    plans.sort((p1, p2) -> {
267      Pair<ServerName, Float> pair1 = snapshot.get(p1.getRegionName());
268      Float ratio1 =
269        pair1 == null ? 0 : pair1.getFirst().equals(p1.getDestination()) ? pair1.getSecond() : 0f;
270      Pair<ServerName, Float> pair2 = snapshot.get(p2.getRegionName());
271      Float ratio2 =
272        pair2 == null ? 0 : pair2.getFirst().equals(p2.getDestination()) ? pair2.getSecond() : 0f;
273      return ratio1.compareTo(ratio2) * (-1);
274    });
275    return plans;
276  }
277
278  private class CacheAwareCandidateGenerator extends CandidateGenerator {
279    @Override
280    protected BalanceAction generate(BalancerClusterState cluster) {
281      simulatedRatio = BigDecimal.ZERO;
282      // Move the regions to the servers they were previously hosted on based on the cache ratio
283      if (
284        !regionCacheRatioOnOldServerMap.isEmpty()
285          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
286      ) {
287        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
288          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
289        // Get the server where this region was previously hosted
290        String regionEncodedName = regionCacheRatioServerMap.getKey();
291        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
292        if (regionInfo == null) {
293          LOG.warn("Region {} not found", regionEncodedName);
294          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
295          return BalanceAction.NULL_ACTION;
296        }
297        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
298          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
299          return BalanceAction.NULL_ACTION;
300        }
301        int regionIndex = cluster.regionsToIndex.get(regionInfo);
302        int oldServerIndex = cluster.serversToIndex
303          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
304        if (oldServerIndex < 0) {
305          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
306          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
307          return BalanceAction.NULL_ACTION;
308        }
309
310        float oldRegionCacheRatio =
311          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
312        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
313        float currentRegionCacheRatio =
314          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
315
316        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
317          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
318        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
319        return action;
320      }
321      return generatePlanForFreeCacheSpace(cluster);
322    }
323
324    private BalanceAction generatePlanForFreeCacheSpace(BalancerClusterState cluster) {
325      if (cluster.serverBlockCacheFreeSize == null) {
326        return BalanceAction.NULL_ACTION;
327      }
328      List<BalanceAction> possibleActions = new ArrayList<>();
329      Map<Integer, Long> serverFreeCacheAfterAction = new HashMap<>();
330      for (int region = 0; region < cluster.numRegions; region++) {
331        RegionInfo regionInfo = cluster.regions[region];
332        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
333          continue;
334        }
335        int currentServer = cluster.regionIndexToServerIndex[region];
336        float ratio = cluster.getSumRegionCacheAndColdDataRatio(region);
337        if (ratio >= lowCacheRatioThreshold) {
338          continue;
339        }
340        int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region);
341        if (regionSizeMb <= 0) {
342          continue;
343        }
344        long bytesNeeded = (long) (regionSizeMb * 1024L * 1024L * minFreeCacheSpaceFactor);
345        for (int server = 0; server < cluster.numServers; server++) {
346          // Skips current server for region, as we can't generate a move to same server
347          if (server == currentServer) {
348            continue;
349          }
350          serverFreeCacheAfterAction.putIfAbsent(server, cluster.serverBlockCacheFreeSize[server]);
351          if (serverFreeCacheAfterAction.get(server) >= bytesNeeded) {
352            serverFreeCacheAfterAction.compute(server, (s, freeCache) -> freeCache - bytesNeeded);
353            possibleActions.add(getAction(currentServer, region, server, -1));
354          }
355        }
356      }
357      if (!possibleActions.isEmpty()) {
358        BalanceAction action =
359          possibleActions.get(ThreadLocalRandom.current().nextInt(possibleActions.size()));
360        LOG.debug("region {} had sum ratio {}",
361          cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(),
362          cluster.getSumRegionCacheAndColdDataRatio(((MoveRegionAction) action).getRegion()));
363        return action;
364      }
365      return BalanceAction.NULL_ACTION;
366    }
367
368    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
369      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
370      float cacheRatioOnOldServer) {
371      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
372        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
373          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
374          : generatePlanForFreeCacheSpace(cluster);
375    }
376
377    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
378      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
379      float cacheRatioOnOldServer) {
380      // Find if the region has already moved by comparing the current server index with the
381      // current server index. This can happen when other candidate generator has moved the region
382      if (currentServerIndex < 0 || oldServerIndex < 0) {
383        return false;
384      }
385
386      float cacheRatioDiffThreshold = 0.6f;
387
388      // Conditions for moving the region
389
390      // If the region is fully cached on the old server, move the region back
391      if (cacheRatioOnOldServer == 1.0f) {
392        if (LOG.isDebugEnabled()) {
393          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
394            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
395        }
396        return true;
397      }
398
399      // Move the region back to the old server if it is cached equally on both the servers
400      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
401        if (LOG.isDebugEnabled()) {
402          LOG.debug(
403            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
404            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
405            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
406        }
407        return true;
408      }
409
410      // If the region is not fully cached on either of the servers, move the region back to the
411      // old server if the region cache ratio on the current server is still much less than the old
412      // server
413      if (
414        cacheRatioOnOldServer > 0.0f
415          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
416      ) {
417        if (LOG.isDebugEnabled()) {
418          LOG.debug(
419            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
420              + "cache ratio {}",
421            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
422            cluster.servers[oldServerIndex], cacheRatioOnOldServer, cacheRatioOnCurrentServer);
423        }
424        return true;
425      }
426
427      if (LOG.isDebugEnabled()) {
428        LOG.debug(
429          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
430          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
431          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
432      }
433      return false;
434    }
435  }
436
437  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
438    @Override
439    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
440      simulatedRatio = BigDecimal.ZERO;
441      // First move all the regions which were hosted previously on some other server back to their
442      // old servers
443      if (
444        !regionCacheRatioOnOldServerMap.isEmpty()
445          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
446      ) {
447        // Get the first region index in the historical cache ratio list
448        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
449          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
450        String regionEncodedName = regionEntry.getKey();
451
452        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
453        if (regionInfo == null) {
454          LOG.warn("Region {} does not exist", regionEncodedName);
455          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
456          return BalanceAction.NULL_ACTION;
457        }
458        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
459          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
460          return BalanceAction.NULL_ACTION;
461        }
462
463        int regionIndex = cluster.regionsToIndex.get(regionInfo);
464
465        // Get the current host name for this region
466        thisServer = cluster.regionIndexToServerIndex[regionIndex];
467
468        // Get the old server index
469        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
470
471        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
472
473        if (otherServer < 0) {
474          // The old server has been moved to other host and hence, the region cannot be moved back
475          // to the old server
476          if (LOG.isDebugEnabled()) {
477            LOG.debug(
478              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
479                + "server {} as the server does not exist",
480              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
481          }
482          return BalanceAction.NULL_ACTION;
483        }
484
485        if (LOG.isDebugEnabled()) {
486          LOG.debug(
487            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
488              + "was hosted there earlier",
489            regionEncodedName, cluster.servers[thisServer].getHostname(),
490            cluster.servers[otherServer].getHostname());
491        }
492
493        return getAction(thisServer, regionIndex, otherServer, -1);
494      }
495
496      if (thisServer < 0 || otherServer < 0) {
497        return BalanceAction.NULL_ACTION;
498      }
499
500      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
501      if (regionIndexToMove < 0) {
502        if (LOG.isDebugEnabled()) {
503          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
504        }
505        return BalanceAction.NULL_ACTION;
506      }
507      if (LOG.isDebugEnabled()) {
508        LOG.debug(
509          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
510            + "least cached on current server",
511          cluster.regions[regionIndexToMove].getEncodedName(),
512          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
513      }
514      return getAction(thisServer, regionIndexToMove, otherServer, -1);
515    }
516
517    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
518      float minCacheRatio = Float.MAX_VALUE;
519      int leastCachedRegion = -1;
520      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
521        int regionIndex = cluster.regionsPerServer[thisServer][i];
522
523        float cacheRatioOnCurrentServer =
524          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
525        if (cacheRatioOnCurrentServer < minCacheRatio) {
526          minCacheRatio = cacheRatioOnCurrentServer;
527          leastCachedRegion = regionIndex;
528        }
529      }
530      return leastCachedRegion;
531    }
532  }
533
534  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
535    static final String REGION_COUNT_SKEW_COST_KEY =
536      "hbase.master.balancer.stochastic.regionCountCost";
537    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
538    private final DoubleArrayCost cost = new DoubleArrayCost();
539
540    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
541      // Load multiplier should be the greatest as it is the most general way to balance data.
542      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
543    }
544
545    @Override
546    void prepare(BalancerClusterState cluster) {
547      super.prepare(cluster);
548      cost.prepare(cluster.numServers);
549      cost.applyCostsChange(costs -> {
550        for (int i = 0; i < cluster.numServers; i++) {
551          costs[i] = cluster.regionsPerServer[i].length;
552        }
553      });
554    }
555
556    @Override
557    protected double cost() {
558      return cost.cost();
559    }
560
561    @Override
562    protected void regionMoved(int region, int oldServer, int newServer) {
563      cost.applyCostsChange(costs -> {
564        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
565        costs[newServer] = cluster.regionsPerServer[newServer].length;
566      });
567    }
568
569    @Override
570    public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
571      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
572    }
573  }
574
575  class CacheAwareCostFunction extends CostFunction {
576    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
577    private double cacheRatio;
578    private double bestCacheRatio;
579    private final float lowCacheRatioThreshold;
580    private final float potentialCacheRatioAfterMove;
581    private final float minFreeCacheSpaceFactor;
582
583    private static final float DEFAULT_CACHE_COST = 20;
584
585    CacheAwareCostFunction(Configuration conf) {
586      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
587      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
588      this.setMultiplier(
589        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
590      bestCacheRatio = 0.0;
591      cacheRatio = 0.0;
592      lowCacheRatioThreshold =
593        conf.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY, LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT);
594      potentialCacheRatioAfterMove = Math.min(1.0f, conf
595        .getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY, POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT));
596      minFreeCacheSpaceFactor =
597        conf.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT);
598    }
599
600    @Override
601    void prepare(BalancerClusterState cluster) {
602      super.prepare(cluster);
603      recomputeCacheRatio(cluster);
604      if (LOG.isDebugEnabled()) {
605        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
606      }
607    }
608
609    private void recomputeCacheRatio(BalancerClusterState cluster) {
610      double[] currentWeighted = computeCurrentWeightedContributions(cluster);
611      double currentSum = 0.0;
612      double bestCacheSum = 0.0;
613      for (int region = 0; region < cluster.numRegions; region++) {
614        currentSum += currentWeighted[region];
615        // here we only get the server index where this region cache ratio is the highest
616        int serverIndexBestCache = cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
617        // get the highest cacheRatio for this region on the current state of allocations
618        double currentHighestCache =
619          cluster.getOrComputeWeightedRegionCacheRatio(region, serverIndexBestCache);
620        // Get a hypothetical best cache ratio for this region if any server has enough free cache
621        // to host it.
622        double potentialHighestCache = potentialBestWeightedFromFreeCache(cluster, region);
623        bestCacheSum += Math.max(currentHighestCache, potentialHighestCache);
624      }
625      bestCacheRatio = bestCacheSum;
626      if (bestCacheSum <= 0.0) {
627        cacheRatio = cluster.numRegions == 0 ? 1.0 : 0.0;
628      } else {
629        cacheRatio = Math.min(1.0, currentSum / bestCacheSum);
630      }
631    }
632
633    private double[] computeCurrentWeightedContributions(BalancerClusterState cluster) {
634      int totalRegions = cluster.numRegions;
635      double[] contrib = new double[totalRegions];
636      for (int r = 0; r < totalRegions; r++) {
637        int s = cluster.regionIndexToServerIndex[r];
638        int sizeMb = cluster.getRegionSizeMinusColdDataMB(r);
639        if (sizeMb <= 0) {
640          contrib[r] = 0.0;
641          continue;
642        }
643        boolean movedInSimulation = cluster.initialRegionIndexToServerIndex[r] != s;
644        if (
645          cluster.serverBlockCacheFreeSize != null && movedInSimulation
646            && cluster.getSumRegionCacheAndColdDataRatio(r) < lowCacheRatioThreshold
647        ) {
648          LOG.debug("Region {} is simulated moved to new server {}",
649            cluster.regions[r].getEncodedName(), cluster.servers[s].getHostname());
650          long bytesNeeded = (long) (sizeMb * 1024L * 1024L * minFreeCacheSpaceFactor);
651          if (cluster.serverBlockCacheFreeSize[s] >= bytesNeeded) {
652            contrib[r] = sizeMb * potentialCacheRatioAfterMove;
653            continue;
654          }
655        }
656        contrib[r] = cluster.getOrComputeWeightedRegionCacheRatio(r, s);
657      }
658      return contrib;
659    }
660
661    /*
662     * If this region is cold in metrics and at least one RS (including its current host) reports
663     * enough free block cache to hold it, return an optimistic weighted cache score ({@link
664     * #potentialCacheRatioAfterMove} * region MB) so placement is not considered optimal solely
665     * from low ratios when capacity exists somewhere in the cluster.
666     */
667    private double potentialBestWeightedFromFreeCache(BalancerClusterState cluster, int region) {
668      float observedRatio = cluster.getSumRegionCacheAndColdDataRatio(region);
669      if (observedRatio >= lowCacheRatioThreshold) {
670        return 0.0;
671      }
672      int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region);
673      if (regionSizeMb <= 0) {
674        return 0.0;
675      }
676      long regionSizeBytes = (long) regionSizeMb * 1024L * 1024L;
677      long requiredFree = (long) (regionSizeBytes * minFreeCacheSpaceFactor);
678      for (int s = 0; s < cluster.numServers; s++) {
679        if (cluster.serverBlockCacheFreeSize[s] >= requiredFree) {
680          return regionSizeMb * potentialCacheRatioAfterMove;
681        }
682      }
683      return 0.0;
684    }
685
686    @Override
687    protected double cost() {
688      return scale(0, 1, 1 - cacheRatio);
689    }
690
691    @Override
692    protected void regionMoved(int region, int oldServer, int newServer) {
693      double regionCacheRatioOnOldServer =
694        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
695      if (simulatedRatio.equals(BigDecimal.ZERO)) {
696        double potentialCachedSizeOnNewServer =
697          cluster.getRegionSizeMinusColdDataMB(region) * potentialCacheRatioAfterMove;
698        boolean simulateCacheBasedOnFreeSpace =
699          cluster.getOrComputeRegionCacheRatio(region, oldServer) < lowCacheRatioThreshold
700            && cluster.serverBlockCacheFreeSize[newServer] >= potentialCachedSizeOnNewServer;
701        double regionCacheRatioOnNewServer = simulateCacheBasedOnFreeSpace
702          ? potentialCachedSizeOnNewServer
703          : cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
704        double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
705        double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
706        LOG.debug(
707          "simulating moving region {} using simulateCacheBasedOnFreeSpace={} "
708            + "got a normalized delta of {} to be added to cacheRatio: {}",
709          cluster.regions[region].getEncodedName(), simulateCacheBasedOnFreeSpace, normalizedDelta,
710          cacheRatio);
711        simulatedRatio = BigDecimal.valueOf(normalizedDelta);
712        cacheRatio += normalizedDelta;
713        if (cacheRatio < 0.0 || cacheRatio > 1.0) {
714          LOG.info(
715            "Recomputing cacheRatio after calculating impact of region move: \n "
716              + "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:"
717              + "regionCacheRatioOnOldServer:{}:regionCacheRatioOnNewServer:{}:"
718              + "bestRegionCacheRatio:{}:cacheRatio:{}",
719            cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
720            cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
721            regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
722          recomputeCacheRatio(cluster);
723        }
724      } else {
725        // This means we are in an undoAction call and need to reverse the cache delta applied in
726        // the region move simulation
727        cacheRatio -= simulatedRatio.doubleValue();
728      }
729    }
730
731    private int getServerWithBestCacheRatioForRegion(int region) {
732      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
733    }
734
735    @Override
736    public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
737      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
738    }
739  }
740}