001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.util.ArrayDeque;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Deque;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ClusterMetrics;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.Size;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@InterfaceAudience.Private
052public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
053  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
054
055  private Configuration configuration;
056
057  public enum GeneratorFunctionType {
058    LOAD,
059    CACHE_RATIO
060  }
061
062  @Override
063  public synchronized void loadConf(Configuration configuration) {
064    this.configuration = configuration;
065    this.costFunctions = new ArrayList<>();
066    super.loadConf(configuration);
067  }
068
069  @Override
070  protected List<CandidateGenerator> createCandidateGenerators() {
071    List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
072    candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
073      new CacheAwareSkewnessCandidateGenerator());
074    candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
075      new CacheAwareCandidateGenerator());
076    return candidateGenerators;
077  }
078
079  @Override
080  protected List<CostFunction> createCostFunctions(Configuration configuration) {
081    List<CostFunction> costFunctions = new ArrayList<>();
082    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
083    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
084    return costFunctions;
085  }
086
087  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
088    if (costFunction.getMultiplier() > 0) {
089      costFunctions.add(costFunction);
090    }
091  }
092
093  @Override
094  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
095    this.clusterStatus = clusterMetrics;
096    updateRegionLoad();
097  }
098
099  /**
100   * Collect the amount of region cached for all the regions from all the active region servers.
101   */
102  private void updateRegionLoad() {
103    loads = new HashMap<>();
104    regionCacheRatioOnOldServerMap = new HashMap<>();
105    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
106
107    // Build current region cache statistics
108    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
109      // Create a map of region and the server where it is currently hosted
110      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
111        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
112
113        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
114
115        // Get the total size of the hFiles in this region
116        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
117
118        rload.add(new BalancerRegionLoad(rm));
119        // Maintain a map of region and it's total size. This is needed to calculate the cache
120        // ratios for the regions cached on old region servers
121        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
122        loads.put(regionEncodedName, rload);
123      });
124    });
125
126    // Build cache statistics for the regions hosted previously on old region servers
127    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
128      // Find if a region was previously hosted on a server other than the one it is currently
129      // hosted on.
130      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
131        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
132        // this server
133        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
134          ServerName currentServer =
135            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
136          if (!ServerName.isSameAddress(currentServer, sn)) {
137            int regionSizeMB =
138              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
139            float regionCacheRatioOnOldServer =
140              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
141            regionCacheRatioOnOldServerMap.put(regionEncodedName,
142              new Pair<>(sn, regionCacheRatioOnOldServer));
143          }
144        }
145      });
146    });
147  }
148
149  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
150    Optional<RegionInfo> regionInfoOptional =
151      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
152        return regionName.equals(ri.getEncodedName());
153      }).findFirst();
154
155    if (regionInfoOptional.isPresent()) {
156      return regionInfoOptional.get();
157    }
158    return null;
159  }
160
161  private class CacheAwareCandidateGenerator extends CandidateGenerator {
162    @Override
163    protected BalanceAction generate(BalancerClusterState cluster) {
164      // Move the regions to the servers they were previously hosted on based on the cache ratio
165      if (
166        !regionCacheRatioOnOldServerMap.isEmpty()
167          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
168      ) {
169        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
170          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
171        // Get the server where this region was previously hosted
172        String regionEncodedName = regionCacheRatioServerMap.getKey();
173        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
174        if (regionInfo == null) {
175          LOG.warn("Region {} not found", regionEncodedName);
176          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
177          return BalanceAction.NULL_ACTION;
178        }
179        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
180          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
181          return BalanceAction.NULL_ACTION;
182        }
183        int regionIndex = cluster.regionsToIndex.get(regionInfo);
184        int oldServerIndex = cluster.serversToIndex
185          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
186        if (oldServerIndex < 0) {
187          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
188          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
189          return BalanceAction.NULL_ACTION;
190        }
191
192        float oldRegionCacheRatio =
193          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
194        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
195        float currentRegionCacheRatio =
196          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
197
198        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
199          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
200        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
201        return action;
202      }
203      return BalanceAction.NULL_ACTION;
204    }
205
206    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
207      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
208      float cacheRatioOnOldServer) {
209      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
210        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
211          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
212          : BalanceAction.NULL_ACTION;
213    }
214
215    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
216      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
217      float cacheRatioOnOldServer) {
218      // Find if the region has already moved by comparing the current server index with the
219      // current server index. This can happen when other candidate generator has moved the region
220      if (currentServerIndex < 0 || oldServerIndex < 0) {
221        return false;
222      }
223
224      float cacheRatioDiffThreshold = 0.6f;
225
226      // Conditions for moving the region
227
228      // If the region is fully cached on the old server, move the region back
229      if (cacheRatioOnOldServer == 1.0f) {
230        if (LOG.isDebugEnabled()) {
231          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
232            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
233        }
234        return true;
235      }
236
237      // Move the region back to the old server if it is cached equally on both the servers
238      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
239        if (LOG.isDebugEnabled()) {
240          LOG.debug(
241            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
242            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
243            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
244        }
245        return true;
246      }
247
248      // If the region is not fully cached on either of the servers, move the region back to the
249      // old server if the region cache ratio on the current server is still much less than the old
250      // server
251      if (
252        cacheRatioOnOldServer > 0.0f
253          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
254      ) {
255        if (LOG.isDebugEnabled()) {
256          LOG.debug(
257            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
258              + "cache ratio {}",
259            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
260            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
261        }
262        return true;
263      }
264
265      if (LOG.isDebugEnabled()) {
266        LOG.debug(
267          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
268          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
269          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
270      }
271      return false;
272    }
273  }
274
275  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
276    @Override
277    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
278      // First move all the regions which were hosted previously on some other server back to their
279      // old servers
280      if (
281        !regionCacheRatioOnOldServerMap.isEmpty()
282          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
283      ) {
284        // Get the first region index in the historical cache ratio list
285        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
286          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
287        String regionEncodedName = regionEntry.getKey();
288
289        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
290        if (regionInfo == null) {
291          LOG.warn("Region {} does not exist", regionEncodedName);
292          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
293          return BalanceAction.NULL_ACTION;
294        }
295        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
296          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
297          return BalanceAction.NULL_ACTION;
298        }
299
300        int regionIndex = cluster.regionsToIndex.get(regionInfo);
301
302        // Get the current host name for this region
303        thisServer = cluster.regionIndexToServerIndex[regionIndex];
304
305        // Get the old server index
306        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
307
308        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
309
310        if (otherServer < 0) {
311          // The old server has been moved to other host and hence, the region cannot be moved back
312          // to the old server
313          if (LOG.isDebugEnabled()) {
314            LOG.debug(
315              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
316                + "server {} as the server does not exist",
317              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
318          }
319          return BalanceAction.NULL_ACTION;
320        }
321
322        if (LOG.isDebugEnabled()) {
323          LOG.debug(
324            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
325              + "was hosted their earlier",
326            regionEncodedName, cluster.servers[thisServer].getHostname(),
327            cluster.servers[otherServer].getHostname());
328        }
329
330        return getAction(thisServer, regionIndex, otherServer, -1);
331      }
332
333      if (thisServer < 0 || otherServer < 0) {
334        return BalanceAction.NULL_ACTION;
335      }
336
337      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
338      if (regionIndexToMove < 0) {
339        if (LOG.isDebugEnabled()) {
340          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
341        }
342        return BalanceAction.NULL_ACTION;
343      }
344      if (LOG.isDebugEnabled()) {
345        LOG.debug(
346          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
347            + "least cached on current server",
348          cluster.regions[regionIndexToMove].getEncodedName(),
349          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
350      }
351      return getAction(thisServer, regionIndexToMove, otherServer, -1);
352    }
353
354    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
355      float minCacheRatio = Float.MAX_VALUE;
356      int leastCachedRegion = -1;
357      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
358        int regionIndex = cluster.regionsPerServer[thisServer][i];
359
360        float cacheRatioOnCurrentServer =
361          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
362        if (cacheRatioOnCurrentServer < minCacheRatio) {
363          minCacheRatio = cacheRatioOnCurrentServer;
364          leastCachedRegion = regionIndex;
365        }
366      }
367      return leastCachedRegion;
368    }
369  }
370
371  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
372    static final String REGION_COUNT_SKEW_COST_KEY =
373      "hbase.master.balancer.stochastic.regionCountCost";
374    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
375    private final DoubleArrayCost cost = new DoubleArrayCost();
376
377    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
378      // Load multiplier should be the greatest as it is the most general way to balance data.
379      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
380    }
381
382    @Override
383    void prepare(BalancerClusterState cluster) {
384      super.prepare(cluster);
385      cost.prepare(cluster.numServers);
386      cost.applyCostsChange(costs -> {
387        for (int i = 0; i < cluster.numServers; i++) {
388          costs[i] = cluster.regionsPerServer[i].length;
389        }
390      });
391    }
392
393    @Override
394    protected double cost() {
395      return cost.cost();
396    }
397
398    @Override
399    protected void regionMoved(int region, int oldServer, int newServer) {
400      cost.applyCostsChange(costs -> {
401        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
402        costs[newServer] = cluster.regionsPerServer[newServer].length;
403      });
404    }
405
406    public final void updateWeight(double[] weights) {
407      weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
408    }
409  }
410
411  static class CacheAwareCostFunction extends CostFunction {
412    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
413    private double cacheRatio;
414    private double bestCacheRatio;
415
416    private static final float DEFAULT_CACHE_COST = 20;
417
418    CacheAwareCostFunction(Configuration conf) {
419      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
420      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
421      this.setMultiplier(
422        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
423      bestCacheRatio = 0.0;
424      cacheRatio = 0.0;
425    }
426
427    @Override
428    void prepare(BalancerClusterState cluster) {
429      super.prepare(cluster);
430      cacheRatio = 0.0;
431      bestCacheRatio = 0.0;
432
433      for (int region = 0; region < cluster.numRegions; region++) {
434        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
435          cluster.regionIndexToServerIndex[region]);
436        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
437          getServerWithBestCacheRatioForRegion(region));
438      }
439
440      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
441      if (LOG.isDebugEnabled()) {
442        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
443      }
444    }
445
446    @Override
447    protected double cost() {
448      return scale(0, 1, 1 - cacheRatio);
449    }
450
451    @Override
452    protected void regionMoved(int region, int oldServer, int newServer) {
453      double regionCacheRatioOnOldServer =
454        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
455      double regionCacheRatioOnNewServer =
456        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
457      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
458      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
459      cacheRatio += normalizedDelta;
460      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
461        LOG.debug(
462          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
463            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
464          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
465          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
466          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
467      }
468    }
469
470    private int getServerWithBestCacheRatioForRegion(int region) {
471      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
472    }
473
474    @Override
475    public final void updateWeight(double[] weights) {
476      weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
477    }
478  }
479}