001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
003 * agreements. See the NOTICE file distributed with this work for additional information regarding
004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
005 * "License"); you may not use this file except in compliance with the License. You may obtain a
006 * copy of the License at
007 * <p>
008 * http://www.apache.org/licenses/LICENSE-2.0
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software distributed under the License
011 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
012 * or implied. See the License for the specific language governing permissions and limitations under
013 * the License.
014 */
015package org.apache.hadoop.hbase.master.balancer;
016
017import java.io.BufferedReader;
018import java.io.FileReader;
019import java.io.IOException;
020import java.io.InputStreamReader;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.regex.Pattern;
026import java.util.regex.PatternSyntaxException;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * This is an optional Cost function designed to allow region count skew across RegionServers. A
038 * rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
039 * rule is composed of a regexp for hostname, and a limit. For example, we could have:
040 * <p>
041 * * rs[0-9] 200 * rs1[0-9] 50
042 * </p>
043 * RegionServers with hostname matching the first rules will have a limit of 200, and the others 50.
044 * If there's no match, a default is set. The costFunction is trying to fill all RegionServers
045 * linearly, meaning that if the global usage is at 50%, then all RegionServers should hold half of
046 * their capacity in terms of regions. In order to use this CostFunction, you need to set the
047 * following options:
048 * <ul>
049 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
050 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li>
051 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li>
052 * </ul>
053 * The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
054 */
055@InterfaceAudience.Private
056public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer.CostFunction {
057
058  /**
059   * configuration used for the path where the rule file is stored.
060   */
061  static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE =
062      "hbase.master.balancer.heterogeneousRegionCountRulesFile";
063  private static final Logger LOG =
064      LoggerFactory.getLogger(HeterogeneousRegionCountCostFunction.class);
065  /**
066   * Default rule to apply when the rule file is not found. Default to 200.
067   */
068  private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT =
069      "hbase.master.balancer.heterogeneousRegionCountDefault";
070  /**
071   * Cost for the function. Default to 500, can be changed.
072   */
073  private static final String REGION_COUNT_SKEW_COST_KEY =
074      "hbase.master.balancer.stochastic.heterogeneousRegionCountCost";
075  private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
076  private final String rulesPath;
077
078  /**
079   * Contains the rules, key is the regexp for ServerName, value is the limit
080   */
081  private final Map<Pattern, Integer> limitPerRule;
082
083  /**
084   * This is a cache, used to not go through all the limitPerRule map when searching for limit
085   */
086  private final Map<ServerName, Integer> limitPerRS;
087  private final Configuration conf;
088  private int defaultNumberOfRegions;
089
090  /**
091   * Total capacity of regions for the cluster, based on the online RS and their associated rules
092   */
093  private int totalCapacity = 0;
094  double overallUsage;
095
096  HeterogeneousRegionCountCostFunction(final Configuration conf) {
097    super(conf);
098    this.conf = conf;
099    this.limitPerRS = new HashMap<>();
100    this.limitPerRule = new HashMap<>();
101    this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
102    this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE);
103    this.defaultNumberOfRegions =
104        conf.getInt(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200);
105
106    if (this.defaultNumberOfRegions < 0) {
107      LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT
108          + "'. Setting default to 200");
109      this.defaultNumberOfRegions = 200;
110    }
111    if (conf.getFloat(StochasticLoadBalancer.RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
112      StochasticLoadBalancer.RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
113      LOG.warn("regionCountCost is not set to 0, "
114          + " this will interfere with the HeterogeneousRegionCountCostFunction!");
115    }
116  }
117
118  /**
119   * Called once per LB invocation to give the cost function to initialize it's state, and perform
120   * any costly calculation.
121   */
122  @Override
123  void init(final BaseLoadBalancer.Cluster cluster) {
124    this.cluster = cluster;
125    this.loadRules();
126  }
127
128  @Override
129  protected double cost() {
130    double cost = 0;
131    final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity);
132
133    for (int i = 0; i < this.cluster.numServers; i++) {
134      // retrieve capacity for each RS
135      final ServerName sn = this.cluster.servers[i];
136      final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
137      final double nbrRegions = this.cluster.regionsPerServer[i].length;
138      final double usage = nbrRegions / limit;
139      if (usage > targetUsage) {
140        // cost is the number of regions above the local limit
141        final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit;
142        cost += localCost;
143      }
144    }
145
146    return cost / (double) this.cluster.numServers;
147  }
148
149  /**
150   * used to load the rule files.
151   */
152  void loadRules() {
153    final List<String> lines = readFile(this.rulesPath);
154    if (null == lines) {
155      LOG.warn("cannot load rules file, keeping latest rules file which has "
156          + this.limitPerRule.size() + " rules");
157      return;
158    }
159
160    LOG.info("loading rules file '" + this.rulesPath + "'");
161    this.limitPerRule.clear();
162    for (final String line : lines) {
163      try {
164        if (line.length() == 0) {
165          continue;
166        }
167        if (line.startsWith("#")) {
168          continue;
169        }
170        final String[] splits = line.split(" ");
171        if (splits.length != 2) {
172          throw new IOException(
173              "line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
174        }
175
176        final Pattern pattern = Pattern.compile(splits[0]);
177        final Integer limit = Integer.parseInt(splits[1]);
178        this.limitPerRule.put(pattern, limit);
179      } catch (final IOException | NumberFormatException | PatternSyntaxException e) {
180        LOG.error("error on line: " + e);
181      }
182    }
183    this.rebuildCache();
184  }
185
186  /**
187   * used to read the rule files from either HDFS or local FS
188   */
189  private List<String> readFile(final String filename) {
190    if (null == filename) {
191      return null;
192    }
193    try {
194      if (filename.startsWith("file:")) {
195        return readFileFromLocalFS(filename);
196      }
197      return readFileFromHDFS(filename);
198    } catch (IOException e) {
199      LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage());
200      return null;
201    }
202  }
203
204  /**
205   * used to read the rule files from HDFS
206   */
207  private List<String> readFileFromHDFS(final String filename) throws IOException {
208    final Path path = new Path(filename);
209    final FileSystem fs = FileSystem.get(this.conf);
210    final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
211    return readLines(reader);
212  }
213
214  /**
215   * used to read the rule files from local FS
216   */
217  private List<String> readFileFromLocalFS(final String filename) throws IOException {
218    BufferedReader reader = new BufferedReader(new FileReader(filename));
219    return readLines(reader);
220  }
221
222  private List<String> readLines(BufferedReader reader) throws IOException {
223    final List<String> records = new ArrayList<>();
224    String line;
225    while ((line = reader.readLine()) != null) {
226      records.add(line);
227    }
228    reader.close();
229    return records;
230  }
231
232  /**
233   * Rebuild cache matching ServerNames and their capacity.
234   */
235  private void rebuildCache() {
236    LOG.debug("Rebuilding cache of capacity for each RS");
237    this.limitPerRS.clear();
238    this.totalCapacity = 0;
239    if (null == this.cluster) {
240      return;
241    }
242    for (int i = 0; i < this.cluster.numServers; i++) {
243      final ServerName sn = this.cluster.servers[i];
244      final int capacity = this.findLimitForRS(sn);
245      LOG.debug(sn.getHostname() + " can hold " + capacity + " regions");
246      this.totalCapacity += capacity;
247    }
248    overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity;
249    LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions ("
250        + Math.round(overallUsage * 100) + "%)");
251    if (overallUsage >= 1) {
252      LOG.warn("Cluster is overused, {}", overallUsage);
253    }
254  }
255
256  /**
257   * Find the limit for a ServerName. If not found then return the default value
258   * @param serverName the server we are looking for
259   * @return the limit
260   */
261  int findLimitForRS(final ServerName serverName) {
262    boolean matched = false;
263    int limit = -1;
264    for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) {
265      if (entry.getKey().matcher(serverName.getHostname()).matches()) {
266        matched = true;
267        limit = entry.getValue();
268        break;
269      }
270    }
271    if (!matched) {
272      limit = this.defaultNumberOfRegions;
273    }
274    // Feeding cache
275    this.limitPerRS.put(serverName, limit);
276    return limit;
277  }
278
279  int getNumberOfRulesLoaded() {
280    return this.limitPerRule.size();
281  }
282}