001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.text.DecimalFormat;
032import java.util.ArrayDeque;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Deque;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.ClusterMetrics;
042import org.apache.hadoop.hbase.RegionMetrics;
043import org.apache.hadoop.hbase.ServerMetrics;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.Size;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@InterfaceAudience.Private
053public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
054  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
055
056  private Configuration configuration;
057
058  public enum GeneratorFunctionType {
059    LOAD,
060    CACHE_RATIO
061  }
062
063  @Override
064  public synchronized void loadConf(Configuration configuration) {
065    this.configuration = configuration;
066    this.costFunctions = new ArrayList<>();
067    super.loadConf(configuration);
068  }
069
070  @Override
071  protected List<CandidateGenerator> createCandidateGenerators() {
072    List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
073    candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
074      new CacheAwareSkewnessCandidateGenerator());
075    candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
076      new CacheAwareCandidateGenerator());
077    return candidateGenerators;
078  }
079
080  @Override
081  protected List<CostFunction> createCostFunctions(Configuration configuration) {
082    List<CostFunction> costFunctions = new ArrayList<>();
083    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
084    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
085    return costFunctions;
086  }
087
088  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
089    if (costFunction.getMultiplier() > 0) {
090      costFunctions.add(costFunction);
091    }
092  }
093
094  @Override
095  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
096    this.clusterStatus = clusterMetrics;
097    updateRegionLoad();
098  }
099
100  /**
101   * Collect the amount of region cached for all the regions from all the active region servers.
102   */
103  private void updateRegionLoad() {
104    loads = new HashMap<>();
105    regionCacheRatioOnOldServerMap = new HashMap<>();
106    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
107
108    // Build current region cache statistics
109    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
110      // Create a map of region and the server where it is currently hosted
111      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
112        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
113
114        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
115
116        // Get the total size of the hFiles in this region
117        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
118
119        rload.add(new BalancerRegionLoad(rm));
120        // Maintain a map of region and it's total size. This is needed to calculate the cache
121        // ratios for the regions cached on old region servers
122        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
123        loads.put(regionEncodedName, rload);
124      });
125    });
126
127    // Build cache statistics for the regions hosted previously on old region servers
128    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
129      // Find if a region was previously hosted on a server other than the one it is currently
130      // hosted on.
131      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
132        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
133        // this server
134        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
135          ServerName currentServer =
136            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
137          if (!ServerName.isSameAddress(currentServer, sn)) {
138            int regionSizeMB =
139              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
140            float regionCacheRatioOnOldServer =
141              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
142            regionCacheRatioOnOldServerMap.put(regionEncodedName,
143              new Pair<>(sn, regionCacheRatioOnOldServer));
144          }
145        }
146      });
147    });
148  }
149
150  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
151    Optional<RegionInfo> regionInfoOptional =
152      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
153        return regionName.equals(ri.getEncodedName());
154      }).findFirst();
155
156    if (regionInfoOptional.isPresent()) {
157      return regionInfoOptional.get();
158    }
159    return null;
160  }
161
162  private class CacheAwareCandidateGenerator extends CandidateGenerator {
163    @Override
164    protected BalanceAction generate(BalancerClusterState cluster) {
165      // Move the regions to the servers they were previously hosted on based on the cache ratio
166      if (
167        !regionCacheRatioOnOldServerMap.isEmpty()
168          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
169      ) {
170        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
171          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
172        // Get the server where this region was previously hosted
173        String regionEncodedName = regionCacheRatioServerMap.getKey();
174        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
175        if (regionInfo == null) {
176          LOG.warn("Region {} not found", regionEncodedName);
177          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
178          return BalanceAction.NULL_ACTION;
179        }
180        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
181          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
182          return BalanceAction.NULL_ACTION;
183        }
184        int regionIndex = cluster.regionsToIndex.get(regionInfo);
185        int oldServerIndex = cluster.serversToIndex
186          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
187        if (oldServerIndex < 0) {
188          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
189          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
190          return BalanceAction.NULL_ACTION;
191        }
192
193        float oldRegionCacheRatio =
194          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
195        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
196        float currentRegionCacheRatio =
197          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
198
199        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
200          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
201        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
202        return action;
203      }
204      return BalanceAction.NULL_ACTION;
205    }
206
207    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
208      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
209      float cacheRatioOnOldServer) {
210      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
211        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
212          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
213          : BalanceAction.NULL_ACTION;
214    }
215
216    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
217      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
218      float cacheRatioOnOldServer) {
219      // Find if the region has already moved by comparing the current server index with the
220      // current server index. This can happen when other candidate generator has moved the region
221      if (currentServerIndex < 0 || oldServerIndex < 0) {
222        return false;
223      }
224
225      DecimalFormat df = new DecimalFormat("#");
226      df.setMaximumFractionDigits(4);
227
228      float cacheRatioDiffThreshold = 0.6f;
229
230      // Conditions for moving the region
231
232      // If the region is fully cached on the old server, move the region back
233      if (cacheRatioOnOldServer == 1.0f) {
234        if (LOG.isDebugEnabled()) {
235          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
236            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
237        }
238        return true;
239      }
240
241      // Move the region back to the old server if it is cached equally on both the servers
242      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
243        if (LOG.isDebugEnabled()) {
244          LOG.debug(
245            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
246            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
247            cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer));
248        }
249        return true;
250      }
251
252      // If the region is not fully cached on either of the servers, move the region back to the
253      // old server if the region cache ratio on the current server is still much less than the old
254      // server
255      if (
256        cacheRatioOnOldServer > 0.0f
257          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
258      ) {
259        if (LOG.isDebugEnabled()) {
260          LOG.debug(
261            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
262              + "cache ratio {}",
263            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
264            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
265            df.format(cacheRatioOnCurrentServer));
266        }
267        return true;
268      }
269
270      if (LOG.isDebugEnabled()) {
271        LOG.debug(
272          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
273          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
274          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
275          df.format(cacheRatioOnCurrentServer));
276      }
277      return false;
278    }
279  }
280
281  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
282    @Override
283    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
284      // First move all the regions which were hosted previously on some other server back to their
285      // old servers
286      if (
287        !regionCacheRatioOnOldServerMap.isEmpty()
288          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
289      ) {
290        // Get the first region index in the historical cache ratio list
291        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
292          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
293        String regionEncodedName = regionEntry.getKey();
294
295        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
296        if (regionInfo == null) {
297          LOG.warn("Region {} does not exist", regionEncodedName);
298          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
299          return BalanceAction.NULL_ACTION;
300        }
301        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
302          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
303          return BalanceAction.NULL_ACTION;
304        }
305
306        int regionIndex = cluster.regionsToIndex.get(regionInfo);
307
308        // Get the current host name for this region
309        thisServer = cluster.regionIndexToServerIndex[regionIndex];
310
311        // Get the old server index
312        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
313
314        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
315
316        if (otherServer < 0) {
317          // The old server has been moved to other host and hence, the region cannot be moved back
318          // to the old server
319          if (LOG.isDebugEnabled()) {
320            LOG.debug(
321              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
322                + "server {} as the server does not exist",
323              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
324          }
325          return BalanceAction.NULL_ACTION;
326        }
327
328        if (LOG.isDebugEnabled()) {
329          LOG.debug(
330            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
331              + "was hosted their earlier",
332            regionEncodedName, cluster.servers[thisServer].getHostname(),
333            cluster.servers[otherServer].getHostname());
334        }
335
336        return getAction(thisServer, regionIndex, otherServer, -1);
337      }
338
339      if (thisServer < 0 || otherServer < 0) {
340        return BalanceAction.NULL_ACTION;
341      }
342
343      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
344      if (regionIndexToMove < 0) {
345        if (LOG.isDebugEnabled()) {
346          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
347        }
348        return BalanceAction.NULL_ACTION;
349      }
350      if (LOG.isDebugEnabled()) {
351        LOG.debug(
352          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
353            + "least cached on current server",
354          cluster.regions[regionIndexToMove].getEncodedName(),
355          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
356      }
357      return getAction(thisServer, regionIndexToMove, otherServer, -1);
358    }
359
360    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
361      float minCacheRatio = Float.MAX_VALUE;
362      int leastCachedRegion = -1;
363      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
364        int regionIndex = cluster.regionsPerServer[thisServer][i];
365
366        float cacheRatioOnCurrentServer =
367          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
368        if (cacheRatioOnCurrentServer < minCacheRatio) {
369          minCacheRatio = cacheRatioOnCurrentServer;
370          leastCachedRegion = regionIndex;
371        }
372      }
373      return leastCachedRegion;
374    }
375  }
376
377  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
378    static final String REGION_COUNT_SKEW_COST_KEY =
379      "hbase.master.balancer.stochastic.regionCountCost";
380    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
381    private final DoubleArrayCost cost = new DoubleArrayCost();
382
383    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
384      // Load multiplier should be the greatest as it is the most general way to balance data.
385      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
386    }
387
388    @Override
389    void prepare(BalancerClusterState cluster) {
390      super.prepare(cluster);
391      cost.prepare(cluster.numServers);
392      cost.applyCostsChange(costs -> {
393        for (int i = 0; i < cluster.numServers; i++) {
394          costs[i] = cluster.regionsPerServer[i].length;
395        }
396      });
397    }
398
399    @Override
400    protected double cost() {
401      return cost.cost();
402    }
403
404    @Override
405    protected void regionMoved(int region, int oldServer, int newServer) {
406      cost.applyCostsChange(costs -> {
407        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
408        costs[newServer] = cluster.regionsPerServer[newServer].length;
409      });
410    }
411
412    public final void updateWeight(double[] weights) {
413      weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
414    }
415  }
416
417  static class CacheAwareCostFunction extends CostFunction {
418    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
419    private double cacheRatio;
420    private double bestCacheRatio;
421
422    private static final float DEFAULT_CACHE_COST = 20;
423
424    CacheAwareCostFunction(Configuration conf) {
425      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
426      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
427      this.setMultiplier(
428        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
429      bestCacheRatio = 0.0;
430      cacheRatio = 0.0;
431    }
432
433    @Override
434    void prepare(BalancerClusterState cluster) {
435      super.prepare(cluster);
436      cacheRatio = 0.0;
437      bestCacheRatio = 0.0;
438
439      for (int region = 0; region < cluster.numRegions; region++) {
440        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
441          cluster.regionIndexToServerIndex[region]);
442        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
443          getServerWithBestCacheRatioForRegion(region));
444      }
445
446      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
447      if (LOG.isDebugEnabled()) {
448        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
449      }
450    }
451
452    @Override
453    protected double cost() {
454      return scale(0, 1, 1 - cacheRatio);
455    }
456
457    @Override
458    protected void regionMoved(int region, int oldServer, int newServer) {
459      double regionCacheRatioOnOldServer =
460        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
461      double regionCacheRatioOnNewServer =
462        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
463      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
464      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
465      cacheRatio += normalizedDelta;
466      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
467        LOG.debug(
468          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
469            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
470          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
471          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
472          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
473      }
474    }
475
476    private int getServerWithBestCacheRatioForRegion(int region) {
477      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
478    }
479
480    @Override
481    public final void updateWeight(double[] weights) {
482      weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
483    }
484  }
485}