001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.BufferedReader;
022import java.io.FileReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.regex.Pattern;
030import java.util.regex.PatternSyntaxException;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * This is an optional Cost function designed to allow region count skew across RegionServers. A
041 * rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
042 * rule is composed of a regexp for hostname, and a limit. For example, we could have:
043 * <p>
044 * * rs[0-9] 200 * rs1[0-9] 50
045 * </p>
046 * RegionServers with hostname matching the first rules will have a limit of 200, and the others 50.
047 * If there's no match, a default is set. The costFunction is trying to fill all RegionServers
048 * linearly, meaning that if the global usage is at 50%, then all RegionServers should hold half of
049 * their capacity in terms of regions. In order to use this CostFunction, you need to set the
050 * following options:
051 * <ul>
052 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
053 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li>
054 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li>
055 * </ul>
056 * The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
057 */
058@InterfaceAudience.Private
059public class HeterogeneousRegionCountCostFunction extends CostFunction {
060
061  /**
062   * configuration used for the path where the rule file is stored.
063   */
064  static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE =
065    "hbase.master.balancer.heterogeneousRegionCountRulesFile";
066  private static final Logger LOG =
067    LoggerFactory.getLogger(HeterogeneousRegionCountCostFunction.class);
068  /**
069   * Default rule to apply when the rule file is not found. Default to 200.
070   */
071  private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT =
072    "hbase.master.balancer.heterogeneousRegionCountDefault";
073  /**
074   * Cost for the function. Default to 500, can be changed.
075   */
076  private static final String REGION_COUNT_SKEW_COST_KEY =
077    "hbase.master.balancer.stochastic.heterogeneousRegionCountCost";
078  private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
079  private final String rulesPath;
080
081  /**
082   * Contains the rules, key is the regexp for ServerName, value is the limit
083   */
084  private final Map<Pattern, Integer> limitPerRule;
085
086  /**
087   * This is a cache, used to not go through all the limitPerRule map when searching for limit
088   */
089  private final Map<ServerName, Integer> limitPerRS;
090  private final Configuration conf;
091  private int defaultNumberOfRegions;
092
093  /**
094   * Total capacity of regions for the cluster, based on the online RS and their associated rules
095   */
096  private int totalCapacity = 0;
097  double overallUsage;
098
099  HeterogeneousRegionCountCostFunction(final Configuration conf) {
100    this.conf = conf;
101    this.limitPerRS = new HashMap<>();
102    this.limitPerRule = new HashMap<>();
103    this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
104    this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE);
105    this.defaultNumberOfRegions =
106      conf.getInt(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200);
107
108    if (this.defaultNumberOfRegions < 0) {
109      LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT
110        + "'. Setting default to 200");
111      this.defaultNumberOfRegions = 200;
112    }
113    if (
114      conf.getFloat(RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
115        RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0
116    ) {
117      LOG.warn("regionCountCost is not set to 0, "
118        + " this will interfere with the HeterogeneousRegionCountCostFunction!");
119    }
120  }
121
122  /**
123   * Called once per LB invocation to give the cost function to initialize it's state, and perform
124   * any costly calculation.
125   */
126  @Override
127  void prepare(final BalancerClusterState cluster) {
128    this.cluster = cluster;
129    this.loadRules();
130  }
131
132  @Override
133  protected double cost() {
134    double cost = 0;
135    final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity);
136
137    for (int i = 0; i < this.cluster.numServers; i++) {
138      // retrieve capacity for each RS
139      final ServerName sn = this.cluster.servers[i];
140      final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
141      final double nbrRegions = this.cluster.regionsPerServer[i].length;
142      final double usage = nbrRegions / limit;
143      if (usage > targetUsage) {
144        // cost is the number of regions above the local limit
145        final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit;
146        cost += localCost;
147      }
148    }
149
150    return cost / (double) this.cluster.numServers;
151  }
152
153  /**
154   * used to load the rule files.
155   */
156  @RestrictedApi(explanation = "Should only be called in tests", link = "",
157      allowedOnPath = ".*(/src/test/.*|HeterogeneousRegionCountCostFunction).java")
158  void loadRules() {
159    final List<String> lines = readFile(this.rulesPath);
160    if (null == lines) {
161      LOG.warn("cannot load rules file, keeping latest rules file which has "
162        + this.limitPerRule.size() + " rules");
163      return;
164    }
165
166    LOG.info("loading rules file '" + this.rulesPath + "'");
167    this.limitPerRule.clear();
168    for (final String line : lines) {
169      try {
170        if (line.length() == 0) {
171          continue;
172        }
173        if (line.startsWith("#")) {
174          continue;
175        }
176        final String[] splits = line.split(" ");
177        if (splits.length != 2) {
178          throw new IOException(
179            "line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
180        }
181
182        final Pattern pattern = Pattern.compile(splits[0]);
183        final Integer limit = Integer.parseInt(splits[1]);
184        this.limitPerRule.put(pattern, limit);
185      } catch (final IOException | NumberFormatException | PatternSyntaxException e) {
186        LOG.error("error on line: " + e);
187      }
188    }
189    this.rebuildCache();
190  }
191
192  /**
193   * used to read the rule files from either HDFS or local FS
194   */
195  private List<String> readFile(final String filename) {
196    if (null == filename) {
197      return null;
198    }
199    try {
200      if (filename.startsWith("file:")) {
201        return readFileFromLocalFS(filename);
202      }
203      return readFileFromHDFS(filename);
204    } catch (IOException e) {
205      LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage());
206      return null;
207    }
208  }
209
210  /**
211   * used to read the rule files from HDFS
212   */
213  private List<String> readFileFromHDFS(final String filename) throws IOException {
214    final Path path = new Path(filename);
215    final FileSystem fs = FileSystem.get(this.conf);
216    final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
217    return readLines(reader);
218  }
219
220  /**
221   * used to read the rule files from local FS
222   */
223  private List<String> readFileFromLocalFS(final String filename) throws IOException {
224    BufferedReader reader = new BufferedReader(new FileReader(filename));
225    return readLines(reader);
226  }
227
228  private List<String> readLines(BufferedReader reader) throws IOException {
229    final List<String> records = new ArrayList<>();
230    try {
231      String line;
232      while ((line = reader.readLine()) != null) {
233        records.add(line);
234      }
235    } finally {
236      reader.close();
237    }
238    return records;
239  }
240
241  /**
242   * Rebuild cache matching ServerNames and their capacity.
243   */
244  private void rebuildCache() {
245    LOG.debug("Rebuilding cache of capacity for each RS");
246    this.limitPerRS.clear();
247    this.totalCapacity = 0;
248    if (null == this.cluster) {
249      return;
250    }
251    for (int i = 0; i < this.cluster.numServers; i++) {
252      final ServerName sn = this.cluster.servers[i];
253      final int capacity = this.findLimitForRS(sn);
254      LOG.debug(sn.getHostname() + " can hold " + capacity + " regions");
255      this.totalCapacity += capacity;
256    }
257    overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity;
258    LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions ("
259      + Math.round(overallUsage * 100) + "%)");
260    if (overallUsage >= 1) {
261      LOG.warn("Cluster is overused, {}", overallUsage);
262    }
263  }
264
265  /**
266   * Find the limit for a ServerName. If not found then return the default value
267   * @param serverName the server we are looking for
268   * @return the limit
269   */
270  int findLimitForRS(final ServerName serverName) {
271    boolean matched = false;
272    int limit = -1;
273    for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) {
274      if (entry.getKey().matcher(serverName.getHostname()).matches()) {
275        matched = true;
276        limit = entry.getValue();
277        break;
278      }
279    }
280    if (!matched) {
281      limit = this.defaultNumberOfRegions;
282    }
283    // Feeding cache
284    this.limitPerRS.put(serverName, limit);
285    return limit;
286  }
287
288  int getNumberOfRulesLoaded() {
289    return this.limitPerRule.size();
290  }
291}