001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.InputStreamReader;
024import java.nio.charset.StandardCharsets;
025import java.nio.file.Files;
026import java.nio.file.Paths;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.regex.Pattern;
031import java.util.regex.PatternSyntaxException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
041import org.apache.hbase.thirdparty.com.google.common.io.CharStreams;
042
043/**
044 * This is an optional Cost function designed to allow region count skew across RegionServers. A
045 * rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
046 * rule is composed of a regexp for hostname, and a limit. For example, we could have:
047 * <p>
048 * * rs[0-9] 200 * rs1[0-9] 50
049 * </p>
050 * RegionServers with hostname matching the first rules will have a limit of 200, and the others 50.
051 * If there's no match, a default is set. The costFunction is trying to fill all RegionServers
052 * linearly, meaning that if the global usage is at 50%, then all RegionServers should hold half of
053 * their capacity in terms of regions. In order to use this CostFunction, you need to set the
054 * following options:
055 * <ul>
056 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
057 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li>
058 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li>
059 * </ul>
060 * The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
061 */
062@InterfaceAudience.Private
063public class HeterogeneousRegionCountCostFunction extends CostFunction {
064
065  /**
066   * configuration used for the path where the rule file is stored.
067   */
068  static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE =
069    "hbase.master.balancer.heterogeneousRegionCountRulesFile";
070  private static final Logger LOG =
071    LoggerFactory.getLogger(HeterogeneousRegionCountCostFunction.class);
072  /**
073   * Default rule to apply when the rule file is not found. Default to 200.
074   */
075  private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT =
076    "hbase.master.balancer.heterogeneousRegionCountDefault";
077  /**
078   * Cost for the function. Default to 500, can be changed.
079   */
080  private static final String REGION_COUNT_SKEW_COST_KEY =
081    "hbase.master.balancer.stochastic.heterogeneousRegionCountCost";
082  private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
083  private final String rulesPath;
084
085  /**
086   * Contains the rules, key is the regexp for ServerName, value is the limit
087   */
088  private final Map<Pattern, Integer> limitPerRule;
089
090  /**
091   * This is a cache, used to not go through all the limitPerRule map when searching for limit
092   */
093  private final Map<ServerName, Integer> limitPerRS;
094  private final Configuration conf;
095  private int defaultNumberOfRegions;
096
097  /**
098   * Total capacity of regions for the cluster, based on the online RS and their associated rules
099   */
100  private int totalCapacity = 0;
101  double overallUsage;
102
103  HeterogeneousRegionCountCostFunction(final Configuration conf) {
104    this.conf = conf;
105    this.limitPerRS = new HashMap<>();
106    this.limitPerRule = new HashMap<>();
107    this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
108    this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE);
109    this.defaultNumberOfRegions =
110      conf.getInt(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200);
111
112    if (this.defaultNumberOfRegions < 0) {
113      LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT
114        + "'. Setting default to 200");
115      this.defaultNumberOfRegions = 200;
116    }
117    if (
118      conf.getFloat(RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
119        RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0
120    ) {
121      LOG.warn("regionCountCost is not set to 0, "
122        + " this will interfere with the HeterogeneousRegionCountCostFunction!");
123    }
124  }
125
126  /**
127   * Called once per LB invocation to give the cost function to initialize it's state, and perform
128   * any costly calculation.
129   */
130  @Override
131  void prepare(final BalancerClusterState cluster) {
132    this.cluster = cluster;
133    this.loadRules();
134  }
135
136  @Override
137  protected double cost() {
138    double cost = 0;
139    final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity);
140
141    for (int i = 0; i < this.cluster.numServers; i++) {
142      // retrieve capacity for each RS
143      final ServerName sn = this.cluster.servers[i];
144      final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
145      final double nbrRegions = this.cluster.regionsPerServer[i].length;
146      final double usage = nbrRegions / limit;
147      if (usage > targetUsage) {
148        // cost is the number of regions above the local limit
149        final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit;
150        cost += localCost;
151      }
152    }
153
154    return cost / (double) this.cluster.numServers;
155  }
156
157  /**
158   * used to load the rule files.
159   */
160  @RestrictedApi(explanation = "Should only be called in tests", link = "",
161      allowedOnPath = ".*(/src/test/.*|HeterogeneousRegionCountCostFunction).java")
162  void loadRules() {
163    final List<String> lines = readFile(this.rulesPath);
164    if (null == lines) {
165      LOG.warn("cannot load rules file, keeping latest rules file which has "
166        + this.limitPerRule.size() + " rules");
167      return;
168    }
169
170    LOG.info("loading rules file '" + this.rulesPath + "'");
171    this.limitPerRule.clear();
172    for (final String line : lines) {
173      try {
174        if (line.length() == 0) {
175          continue;
176        }
177        if (line.startsWith("#")) {
178          continue;
179        }
180        final List<String> splits = Splitter.on(' ').splitToList(line);
181        if (splits.size() != 2) {
182          throw new IOException(
183            "line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
184        }
185
186        final Pattern pattern = Pattern.compile(splits.get(0));
187        final Integer limit = Integer.parseInt(splits.get(1));
188        this.limitPerRule.put(pattern, limit);
189      } catch (final IOException | NumberFormatException | PatternSyntaxException e) {
190        LOG.error("error on line: " + e);
191      }
192    }
193    this.rebuildCache();
194  }
195
196  /**
197   * used to read the rule files from either HDFS or local FS
198   */
199  private List<String> readFile(final String filename) {
200    if (null == filename) {
201      return null;
202    }
203    try {
204      if (filename.startsWith("file:")) {
205        return readFileFromLocalFS(filename);
206      }
207      return readFileFromHDFS(filename);
208    } catch (IOException e) {
209      LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage());
210      return null;
211    }
212  }
213
214  /**
215   * used to read the rule files from HDFS
216   */
217  private List<String> readFileFromHDFS(final String filename) throws IOException {
218    final Path path = new Path(filename);
219    final FileSystem fs = FileSystem.get(this.conf);
220    try (BufferedReader reader =
221      new BufferedReader(new InputStreamReader(fs.open(path), StandardCharsets.UTF_8))) {
222      return CharStreams.readLines(reader);
223    }
224  }
225
226  /**
227   * used to read the rule files from local FS
228   */
229  private List<String> readFileFromLocalFS(final String filename) throws IOException {
230    return Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
231  }
232
233  /**
234   * Rebuild cache matching ServerNames and their capacity.
235   */
236  private void rebuildCache() {
237    LOG.debug("Rebuilding cache of capacity for each RS");
238    this.limitPerRS.clear();
239    this.totalCapacity = 0;
240    if (null == this.cluster) {
241      return;
242    }
243    for (int i = 0; i < this.cluster.numServers; i++) {
244      final ServerName sn = this.cluster.servers[i];
245      final int capacity = this.findLimitForRS(sn);
246      LOG.debug(sn.getHostname() + " can hold " + capacity + " regions");
247      this.totalCapacity += capacity;
248    }
249    overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity;
250    LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions ("
251      + Math.round(overallUsage * 100) + "%)");
252    if (overallUsage >= 1) {
253      LOG.warn("Cluster is overused, {}", overallUsage);
254    }
255  }
256
257  /**
258   * Find the limit for a ServerName. If not found then return the default value
259   * @param serverName the server we are looking for
260   * @return the limit
261   */
262  int findLimitForRS(final ServerName serverName) {
263    boolean matched = false;
264    int limit = -1;
265    for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) {
266      if (entry.getKey().matcher(serverName.getHostname()).matches()) {
267        matched = true;
268        limit = entry.getValue();
269        break;
270      }
271    }
272    if (!matched) {
273      limit = this.defaultNumberOfRegions;
274    }
275    // Feeding cache
276    this.limitPerRS.put(serverName, limit);
277    return limit;
278  }
279
280  int getNumberOfRulesLoaded() {
281    return this.limitPerRule.size();
282  }
283}