001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache is minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.util.ArrayDeque;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Deque;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ClusterMetrics;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.Size;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.master.RegionPlan;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@InterfaceAudience.Private
054public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
055  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
056
057  public static final String CACHE_RATIO_THRESHOLD =
058    "hbase.master.balancer.stochastic.throttling.cacheRatio";
059  public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f;
060
061  public Float ratioThreshold;
062
063  private Long sleepTime;
064  private Configuration configuration;
065
066  public enum GeneratorFunctionType {
067    LOAD,
068    CACHE_RATIO
069  }
070
071  @Override
072  public void loadConf(Configuration configuration) {
073    this.configuration = configuration;
074    this.costFunctions = new ArrayList<>();
075    super.loadConf(configuration);
076    ratioThreshold =
077      this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
078    sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
079  }
080
081  @Override
082  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
083    createCandidateGenerators(Configuration conf) {
084    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
085      new HashMap<>(2);
086    candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
087      new CacheAwareSkewnessCandidateGenerator());
088    candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
089    return candidateGenerators;
090  }
091
092  @Override
093  protected List<CostFunction> createCostFunctions(Configuration configuration) {
094    List<CostFunction> costFunctions = new ArrayList<>();
095    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
096    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
097    return costFunctions;
098  }
099
100  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
101    if (costFunction.getMultiplier() > 0) {
102      costFunctions.add(costFunction);
103    }
104  }
105
106  @Override
107  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
108    this.clusterStatus = clusterMetrics;
109    updateRegionLoad();
110  }
111
112  /**
113   * Collect the amount of region cached for all the regions from all the active region servers.
114   */
115  private void updateRegionLoad() {
116    loads = new HashMap<>();
117    regionCacheRatioOnOldServerMap = new HashMap<>();
118    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
119
120    // Build current region cache statistics
121    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
122      // Create a map of region and the server where it is currently hosted
123      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
124        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
125
126        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
127
128        // Get the total size of the hFiles in this region
129        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
130
131        rload.add(new BalancerRegionLoad(rm));
132        // Maintain a map of region and its total size. This is needed to calculate the cache
133        // ratios for the regions cached on old region servers
134        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
135        loads.put(regionEncodedName, rload);
136      });
137    });
138
139    // Build cache statistics for the regions hosted previously on old region servers
140    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
141      // Find if a region was previously hosted on a server other than the one it is currently
142      // hosted on.
143      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
144        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
145        // this server
146        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
147          ServerName currentServer =
148            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
149          if (!ServerName.isSameAddress(currentServer, sn)) {
150            int regionSizeMB =
151              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
152            float regionCacheRatioOnOldServer =
153              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
154            regionCacheRatioOnOldServerMap.put(regionEncodedName,
155              new Pair<>(sn, regionCacheRatioOnOldServer));
156          }
157        }
158      });
159    });
160  }
161
162  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
163    Optional<RegionInfo> regionInfoOptional =
164      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
165        return regionName.equals(ri.getEncodedName());
166      }).findFirst();
167
168    if (regionInfoOptional.isPresent()) {
169      return regionInfoOptional.get();
170    }
171    return null;
172  }
173
174  @Override
175  public long getThrottleDurationMs(RegionPlan plan) {
176    Pair<ServerName, Float> rsRatio = this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
177    if (
178      rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
179        && rsRatio.getSecond() >= ratioThreshold
180    ) {
181      LOG.debug("Moving region {} to server {} with cache ratio {}. No throttling needed.",
182        plan.getRegionInfo().getEncodedName(), plan.getDestination(), rsRatio.getSecond());
183      return 0L;
184    } else {
185      if (rsRatio != null) {
186        LOG.debug("Moving region {} to server {} with cache ratio: {}. Throttling move for {}ms.",
187          plan.getRegionInfo().getEncodedName(), plan.getDestination(),
188          plan.getDestination().equals(rsRatio.getFirst()) ? rsRatio.getSecond() : "unknown",
189          sleepTime);
190      } else {
191        LOG.debug(
192          "Moving region {} to server {} with no cache ratio info for the region. "
193            + "Throttling move for {}ms.",
194          plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime);
195      }
196      return sleepTime;
197    }
198  }
199
200  @Override
201  protected List<RegionPlan> balanceTable(TableName tableName,
202    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
203    final Map<String, Pair<ServerName, Float>> snapshot = new HashMap<>();
204    snapshot.putAll(this.regionCacheRatioOnOldServerMap);
205    List<RegionPlan> plans = super.balanceTable(tableName, loadOfOneTable);
206    if (plans == null) {
207      return plans;
208    }
209    plans.sort((p1, p2) -> {
210      Pair<ServerName, Float> pair1 = snapshot.get(p1.getRegionName());
211      Float ratio1 =
212        pair1 == null ? 0 : pair1.getFirst().equals(p1.getDestination()) ? pair1.getSecond() : 0f;
213      Pair<ServerName, Float> pair2 = snapshot.get(p2.getRegionName());
214      Float ratio2 =
215        pair2 == null ? 0 : pair2.getFirst().equals(p2.getDestination()) ? pair2.getSecond() : 0f;
216      return ratio1.compareTo(ratio2) * (-1);
217    });
218    return plans;
219  }
220
221  private class CacheAwareCandidateGenerator extends CandidateGenerator {
222    @Override
223    protected BalanceAction generate(BalancerClusterState cluster) {
224      // Move the regions to the servers they were previously hosted on based on the cache ratio
225      if (
226        !regionCacheRatioOnOldServerMap.isEmpty()
227          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
228      ) {
229        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
230          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
231        // Get the server where this region was previously hosted
232        String regionEncodedName = regionCacheRatioServerMap.getKey();
233        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
234        if (regionInfo == null) {
235          LOG.warn("Region {} not found", regionEncodedName);
236          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
237          return BalanceAction.NULL_ACTION;
238        }
239        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
240          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
241          return BalanceAction.NULL_ACTION;
242        }
243        int regionIndex = cluster.regionsToIndex.get(regionInfo);
244        int oldServerIndex = cluster.serversToIndex
245          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
246        if (oldServerIndex < 0) {
247          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
248          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
249          return BalanceAction.NULL_ACTION;
250        }
251
252        float oldRegionCacheRatio =
253          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
254        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
255        float currentRegionCacheRatio =
256          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
257
258        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
259          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
260        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
261        return action;
262      }
263      return BalanceAction.NULL_ACTION;
264    }
265
266    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
267      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
268      float cacheRatioOnOldServer) {
269      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
270        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
271          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
272          : BalanceAction.NULL_ACTION;
273    }
274
275    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
276      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
277      float cacheRatioOnOldServer) {
278      // Find if the region has already moved by comparing the current server index with the
279      // current server index. This can happen when other candidate generator has moved the region
280      if (currentServerIndex < 0 || oldServerIndex < 0) {
281        return false;
282      }
283
284      float cacheRatioDiffThreshold = 0.6f;
285
286      // Conditions for moving the region
287
288      // If the region is fully cached on the old server, move the region back
289      if (cacheRatioOnOldServer == 1.0f) {
290        if (LOG.isDebugEnabled()) {
291          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
292            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
293        }
294        return true;
295      }
296
297      // Move the region back to the old server if it is cached equally on both the servers
298      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
299        if (LOG.isDebugEnabled()) {
300          LOG.debug(
301            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
302            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
303            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
304        }
305        return true;
306      }
307
308      // If the region is not fully cached on either of the servers, move the region back to the
309      // old server if the region cache ratio on the current server is still much less than the old
310      // server
311      if (
312        cacheRatioOnOldServer > 0.0f
313          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
314      ) {
315        if (LOG.isDebugEnabled()) {
316          LOG.debug(
317            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
318              + "cache ratio {}",
319            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
320            cluster.servers[oldServerIndex], cacheRatioOnOldServer, cacheRatioOnCurrentServer);
321        }
322        return true;
323      }
324
325      if (LOG.isDebugEnabled()) {
326        LOG.debug(
327          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
328          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
329          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
330      }
331      return false;
332    }
333  }
334
335  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
336    @Override
337    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
338      // First move all the regions which were hosted previously on some other server back to their
339      // old servers
340      if (
341        !regionCacheRatioOnOldServerMap.isEmpty()
342          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
343      ) {
344        // Get the first region index in the historical cache ratio list
345        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
346          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
347        String regionEncodedName = regionEntry.getKey();
348
349        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
350        if (regionInfo == null) {
351          LOG.warn("Region {} does not exist", regionEncodedName);
352          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
353          return BalanceAction.NULL_ACTION;
354        }
355        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
356          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
357          return BalanceAction.NULL_ACTION;
358        }
359
360        int regionIndex = cluster.regionsToIndex.get(regionInfo);
361
362        // Get the current host name for this region
363        thisServer = cluster.regionIndexToServerIndex[regionIndex];
364
365        // Get the old server index
366        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
367
368        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
369
370        if (otherServer < 0) {
371          // The old server has been moved to other host and hence, the region cannot be moved back
372          // to the old server
373          if (LOG.isDebugEnabled()) {
374            LOG.debug(
375              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
376                + "server {} as the server does not exist",
377              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
378          }
379          return BalanceAction.NULL_ACTION;
380        }
381
382        if (LOG.isDebugEnabled()) {
383          LOG.debug(
384            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
385              + "was hosted there earlier",
386            regionEncodedName, cluster.servers[thisServer].getHostname(),
387            cluster.servers[otherServer].getHostname());
388        }
389
390        return getAction(thisServer, regionIndex, otherServer, -1);
391      }
392
393      if (thisServer < 0 || otherServer < 0) {
394        return BalanceAction.NULL_ACTION;
395      }
396
397      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
398      if (regionIndexToMove < 0) {
399        if (LOG.isDebugEnabled()) {
400          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
401        }
402        return BalanceAction.NULL_ACTION;
403      }
404      if (LOG.isDebugEnabled()) {
405        LOG.debug(
406          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
407            + "least cached on current server",
408          cluster.regions[regionIndexToMove].getEncodedName(),
409          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
410      }
411      return getAction(thisServer, regionIndexToMove, otherServer, -1);
412    }
413
414    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
415      float minCacheRatio = Float.MAX_VALUE;
416      int leastCachedRegion = -1;
417      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
418        int regionIndex = cluster.regionsPerServer[thisServer][i];
419
420        float cacheRatioOnCurrentServer =
421          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
422        if (cacheRatioOnCurrentServer < minCacheRatio) {
423          minCacheRatio = cacheRatioOnCurrentServer;
424          leastCachedRegion = regionIndex;
425        }
426      }
427      return leastCachedRegion;
428    }
429  }
430
431  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
432    static final String REGION_COUNT_SKEW_COST_KEY =
433      "hbase.master.balancer.stochastic.regionCountCost";
434    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
435    private final DoubleArrayCost cost = new DoubleArrayCost();
436
437    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
438      // Load multiplier should be the greatest as it is the most general way to balance data.
439      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
440    }
441
442    @Override
443    void prepare(BalancerClusterState cluster) {
444      super.prepare(cluster);
445      cost.prepare(cluster.numServers);
446      cost.applyCostsChange(costs -> {
447        for (int i = 0; i < cluster.numServers; i++) {
448          costs[i] = cluster.regionsPerServer[i].length;
449        }
450      });
451    }
452
453    @Override
454    protected double cost() {
455      return cost.cost();
456    }
457
458    @Override
459    protected void regionMoved(int region, int oldServer, int newServer) {
460      cost.applyCostsChange(costs -> {
461        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
462        costs[newServer] = cluster.regionsPerServer[newServer].length;
463      });
464    }
465
466    @Override
467    public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
468      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
469    }
470  }
471
472  static class CacheAwareCostFunction extends CostFunction {
473    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
474    private double cacheRatio;
475    private double bestCacheRatio;
476
477    private static final float DEFAULT_CACHE_COST = 20;
478
479    CacheAwareCostFunction(Configuration conf) {
480      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
481      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
482      this.setMultiplier(
483        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
484      bestCacheRatio = 0.0;
485      cacheRatio = 0.0;
486    }
487
488    @Override
489    void prepare(BalancerClusterState cluster) {
490      super.prepare(cluster);
491      cacheRatio = 0.0;
492      bestCacheRatio = 0.0;
493
494      for (int region = 0; region < cluster.numRegions; region++) {
495        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
496          cluster.regionIndexToServerIndex[region]);
497        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
498          getServerWithBestCacheRatioForRegion(region));
499      }
500
501      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
502      if (LOG.isDebugEnabled()) {
503        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
504      }
505    }
506
507    @Override
508    protected double cost() {
509      return scale(0, 1, 1 - cacheRatio);
510    }
511
512    @Override
513    protected void regionMoved(int region, int oldServer, int newServer) {
514      double regionCacheRatioOnOldServer =
515        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
516      double regionCacheRatioOnNewServer =
517        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
518      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
519      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
520      cacheRatio += normalizedDelta;
521      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
522        LOG.debug(
523          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
524            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
525          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
526          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
527          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
528      }
529    }
530
531    private int getServerWithBestCacheRatioForRegion(int region) {
532      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
533    }
534
535    @Override
536    public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
537      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
538    }
539  }
540}