001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.text.DecimalFormat;
032import java.util.ArrayDeque;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Deque;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.ClusterMetrics;
042import org.apache.hadoop.hbase.RegionMetrics;
043import org.apache.hadoop.hbase.ServerMetrics;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.Size;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@InterfaceAudience.Private
053public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
054  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
055
056  private Configuration configuration;
057
058  public enum GeneratorFunctionType {
059    LOAD,
060    CACHE_RATIO
061  }
062
063  @Override
064  public synchronized void loadConf(Configuration configuration) {
065    this.configuration = configuration;
066    this.costFunctions = new ArrayList<>();
067    super.loadConf(configuration);
068  }
069
070  @Override
071  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
072    createCandidateGenerators() {
073    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
074      new HashMap<>(2);
075    candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
076      new CacheAwareSkewnessCandidateGenerator());
077    candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
078    return candidateGenerators;
079  }
080
081  @Override
082  protected List<CostFunction> createCostFunctions(Configuration configuration) {
083    List<CostFunction> costFunctions = new ArrayList<>();
084    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
085    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
086    return costFunctions;
087  }
088
089  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
090    if (costFunction.getMultiplier() > 0) {
091      costFunctions.add(costFunction);
092    }
093  }
094
095  @Override
096  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
097    this.clusterStatus = clusterMetrics;
098    updateRegionLoad();
099  }
100
101  /**
102   * Collect the amount of region cached for all the regions from all the active region servers.
103   */
104  private void updateRegionLoad() {
105    loads = new HashMap<>();
106    regionCacheRatioOnOldServerMap = new HashMap<>();
107    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
108
109    // Build current region cache statistics
110    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
111      // Create a map of region and the server where it is currently hosted
112      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
113        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
114
115        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
116
117        // Get the total size of the hFiles in this region
118        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
119
120        rload.add(new BalancerRegionLoad(rm));
121        // Maintain a map of region and it's total size. This is needed to calculate the cache
122        // ratios for the regions cached on old region servers
123        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
124        loads.put(regionEncodedName, rload);
125      });
126    });
127
128    // Build cache statistics for the regions hosted previously on old region servers
129    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
130      // Find if a region was previously hosted on a server other than the one it is currently
131      // hosted on.
132      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
133        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
134        // this server
135        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
136          ServerName currentServer =
137            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
138          if (!ServerName.isSameAddress(currentServer, sn)) {
139            int regionSizeMB =
140              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
141            float regionCacheRatioOnOldServer =
142              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
143            regionCacheRatioOnOldServerMap.put(regionEncodedName,
144              new Pair<>(sn, regionCacheRatioOnOldServer));
145          }
146        }
147      });
148    });
149  }
150
151  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
152    Optional<RegionInfo> regionInfoOptional =
153      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
154        return regionName.equals(ri.getEncodedName());
155      }).findFirst();
156
157    if (regionInfoOptional.isPresent()) {
158      return regionInfoOptional.get();
159    }
160    return null;
161  }
162
163  private class CacheAwareCandidateGenerator extends CandidateGenerator {
164    @Override
165    protected BalanceAction generate(BalancerClusterState cluster) {
166      // Move the regions to the servers they were previously hosted on based on the cache ratio
167      if (
168        !regionCacheRatioOnOldServerMap.isEmpty()
169          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
170      ) {
171        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
172          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
173        // Get the server where this region was previously hosted
174        String regionEncodedName = regionCacheRatioServerMap.getKey();
175        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
176        if (regionInfo == null) {
177          LOG.warn("Region {} not found", regionEncodedName);
178          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
179          return BalanceAction.NULL_ACTION;
180        }
181        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
182          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
183          return BalanceAction.NULL_ACTION;
184        }
185        int regionIndex = cluster.regionsToIndex.get(regionInfo);
186        int oldServerIndex = cluster.serversToIndex
187          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
188        if (oldServerIndex < 0) {
189          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
190          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
191          return BalanceAction.NULL_ACTION;
192        }
193
194        float oldRegionCacheRatio =
195          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
196        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
197        float currentRegionCacheRatio =
198          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
199
200        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
201          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
202        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
203        return action;
204      }
205      return BalanceAction.NULL_ACTION;
206    }
207
208    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
209      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
210      float cacheRatioOnOldServer) {
211      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
212        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
213          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
214          : BalanceAction.NULL_ACTION;
215    }
216
217    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
218      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
219      float cacheRatioOnOldServer) {
220      // Find if the region has already moved by comparing the current server index with the
221      // current server index. This can happen when other candidate generator has moved the region
222      if (currentServerIndex < 0 || oldServerIndex < 0) {
223        return false;
224      }
225
226      DecimalFormat df = new DecimalFormat("#");
227      df.setMaximumFractionDigits(4);
228
229      float cacheRatioDiffThreshold = 0.6f;
230
231      // Conditions for moving the region
232
233      // If the region is fully cached on the old server, move the region back
234      if (cacheRatioOnOldServer == 1.0f) {
235        if (LOG.isDebugEnabled()) {
236          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
237            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
238        }
239        return true;
240      }
241
242      // Move the region back to the old server if it is cached equally on both the servers
243      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
244        if (LOG.isDebugEnabled()) {
245          LOG.debug(
246            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
247            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
248            cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer));
249        }
250        return true;
251      }
252
253      // If the region is not fully cached on either of the servers, move the region back to the
254      // old server if the region cache ratio on the current server is still much less than the old
255      // server
256      if (
257        cacheRatioOnOldServer > 0.0f
258          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
259      ) {
260        if (LOG.isDebugEnabled()) {
261          LOG.debug(
262            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
263              + "cache ratio {}",
264            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
265            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
266            df.format(cacheRatioOnCurrentServer));
267        }
268        return true;
269      }
270
271      if (LOG.isDebugEnabled()) {
272        LOG.debug(
273          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
274          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
275          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
276          df.format(cacheRatioOnCurrentServer));
277      }
278      return false;
279    }
280  }
281
282  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
283    @Override
284    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
285      // First move all the regions which were hosted previously on some other server back to their
286      // old servers
287      if (
288        !regionCacheRatioOnOldServerMap.isEmpty()
289          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
290      ) {
291        // Get the first region index in the historical cache ratio list
292        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
293          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
294        String regionEncodedName = regionEntry.getKey();
295
296        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
297        if (regionInfo == null) {
298          LOG.warn("Region {} does not exist", regionEncodedName);
299          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
300          return BalanceAction.NULL_ACTION;
301        }
302        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
303          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
304          return BalanceAction.NULL_ACTION;
305        }
306
307        int regionIndex = cluster.regionsToIndex.get(regionInfo);
308
309        // Get the current host name for this region
310        thisServer = cluster.regionIndexToServerIndex[regionIndex];
311
312        // Get the old server index
313        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
314
315        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
316
317        if (otherServer < 0) {
318          // The old server has been moved to other host and hence, the region cannot be moved back
319          // to the old server
320          if (LOG.isDebugEnabled()) {
321            LOG.debug(
322              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
323                + "server {} as the server does not exist",
324              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
325          }
326          return BalanceAction.NULL_ACTION;
327        }
328
329        if (LOG.isDebugEnabled()) {
330          LOG.debug(
331            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
332              + "was hosted their earlier",
333            regionEncodedName, cluster.servers[thisServer].getHostname(),
334            cluster.servers[otherServer].getHostname());
335        }
336
337        return getAction(thisServer, regionIndex, otherServer, -1);
338      }
339
340      if (thisServer < 0 || otherServer < 0) {
341        return BalanceAction.NULL_ACTION;
342      }
343
344      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
345      if (regionIndexToMove < 0) {
346        if (LOG.isDebugEnabled()) {
347          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
348        }
349        return BalanceAction.NULL_ACTION;
350      }
351      if (LOG.isDebugEnabled()) {
352        LOG.debug(
353          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
354            + "least cached on current server",
355          cluster.regions[regionIndexToMove].getEncodedName(),
356          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
357      }
358      return getAction(thisServer, regionIndexToMove, otherServer, -1);
359    }
360
361    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
362      float minCacheRatio = Float.MAX_VALUE;
363      int leastCachedRegion = -1;
364      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
365        int regionIndex = cluster.regionsPerServer[thisServer][i];
366
367        float cacheRatioOnCurrentServer =
368          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
369        if (cacheRatioOnCurrentServer < minCacheRatio) {
370          minCacheRatio = cacheRatioOnCurrentServer;
371          leastCachedRegion = regionIndex;
372        }
373      }
374      return leastCachedRegion;
375    }
376  }
377
378  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
379    static final String REGION_COUNT_SKEW_COST_KEY =
380      "hbase.master.balancer.stochastic.regionCountCost";
381    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
382    private final DoubleArrayCost cost = new DoubleArrayCost();
383
384    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
385      // Load multiplier should be the greatest as it is the most general way to balance data.
386      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
387    }
388
389    @Override
390    void prepare(BalancerClusterState cluster) {
391      super.prepare(cluster);
392      cost.prepare(cluster.numServers);
393      cost.applyCostsChange(costs -> {
394        for (int i = 0; i < cluster.numServers; i++) {
395          costs[i] = cluster.regionsPerServer[i].length;
396        }
397      });
398    }
399
400    @Override
401    protected double cost() {
402      return cost.cost();
403    }
404
405    @Override
406    protected void regionMoved(int region, int oldServer, int newServer) {
407      cost.applyCostsChange(costs -> {
408        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
409        costs[newServer] = cluster.regionsPerServer[newServer].length;
410      });
411    }
412
413    @Override
414    public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
415      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
416    }
417  }
418
419  static class CacheAwareCostFunction extends CostFunction {
420    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
421    private double cacheRatio;
422    private double bestCacheRatio;
423
424    private static final float DEFAULT_CACHE_COST = 20;
425
426    CacheAwareCostFunction(Configuration conf) {
427      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
428      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
429      this.setMultiplier(
430        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
431      bestCacheRatio = 0.0;
432      cacheRatio = 0.0;
433    }
434
435    @Override
436    void prepare(BalancerClusterState cluster) {
437      super.prepare(cluster);
438      cacheRatio = 0.0;
439      bestCacheRatio = 0.0;
440
441      for (int region = 0; region < cluster.numRegions; region++) {
442        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
443          cluster.regionIndexToServerIndex[region]);
444        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
445          getServerWithBestCacheRatioForRegion(region));
446      }
447
448      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
449      if (LOG.isDebugEnabled()) {
450        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
451      }
452    }
453
454    @Override
455    protected double cost() {
456      return scale(0, 1, 1 - cacheRatio);
457    }
458
459    @Override
460    protected void regionMoved(int region, int oldServer, int newServer) {
461      double regionCacheRatioOnOldServer =
462        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
463      double regionCacheRatioOnNewServer =
464        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
465      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
466      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
467      cacheRatio += normalizedDelta;
468      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
469        LOG.debug(
470          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
471            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
472          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
473          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
474          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
475      }
476    }
477
478    private int getServerWithBestCacheRatioForRegion(int region) {
479      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
480    }
481
482    @Override
483    public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
484      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
485    }
486  }
487}