001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.BufferedReader; 022import java.io.FileReader; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.regex.Pattern; 030import java.util.regex.PatternSyntaxException; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * This is an optional Cost function designed to allow region count skew across RegionServers. A 041 * rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A 042 * rule is composed of a regexp for hostname, and a limit. For example, we could have: 043 * <p> 044 * * rs[0-9] 200 * rs1[0-9] 50 045 * </p> 046 * RegionServers with hostname matching the first rules will have a limit of 200, and the others 50. 047 * If there's no match, a default is set. The costFunction is trying to fill all RegionServers 048 * linearly, meaning that if the global usage is at 50%, then all RegionServers should hold half of 049 * their capacity in terms of regions. In order to use this CostFunction, you need to set the 050 * following options: 051 * <ul> 052 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 053 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li> 054 * <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li> 055 * </ul> 056 * The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://). 057 */ 058@InterfaceAudience.Private 059public class HeterogeneousRegionCountCostFunction extends CostFunction { 060 061 /** 062 * configuration used for the path where the rule file is stored. 063 */ 064 static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE = 065 "hbase.master.balancer.heterogeneousRegionCountRulesFile"; 066 private static final Logger LOG = 067 LoggerFactory.getLogger(HeterogeneousRegionCountCostFunction.class); 068 /** 069 * Default rule to apply when the rule file is not found. Default to 200. 070 */ 071 private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT = 072 "hbase.master.balancer.heterogeneousRegionCountDefault"; 073 /** 074 * Cost for the function. Default to 500, can be changed. 075 */ 076 private static final String REGION_COUNT_SKEW_COST_KEY = 077 "hbase.master.balancer.stochastic.heterogeneousRegionCountCost"; 078 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; 079 private final String rulesPath; 080 081 /** 082 * Contains the rules, key is the regexp for ServerName, value is the limit 083 */ 084 private final Map<Pattern, Integer> limitPerRule; 085 086 /** 087 * This is a cache, used to not go through all the limitPerRule map when searching for limit 088 */ 089 private final Map<ServerName, Integer> limitPerRS; 090 private final Configuration conf; 091 private int defaultNumberOfRegions; 092 093 /** 094 * Total capacity of regions for the cluster, based on the online RS and their associated rules 095 */ 096 private int totalCapacity = 0; 097 double overallUsage; 098 099 HeterogeneousRegionCountCostFunction(final Configuration conf) { 100 this.conf = conf; 101 this.limitPerRS = new HashMap<>(); 102 this.limitPerRule = new HashMap<>(); 103 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 104 this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE); 105 this.defaultNumberOfRegions = 106 conf.getInt(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200); 107 108 if (this.defaultNumberOfRegions < 0) { 109 LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT 110 + "'. Setting default to 200"); 111 this.defaultNumberOfRegions = 200; 112 } 113 if ( 114 conf.getFloat(RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY, 115 RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0 116 ) { 117 LOG.warn("regionCountCost is not set to 0, " 118 + " this will interfere with the HeterogeneousRegionCountCostFunction!"); 119 } 120 } 121 122 /** 123 * Called once per LB invocation to give the cost function to initialize it's state, and perform 124 * any costly calculation. 125 */ 126 @Override 127 void prepare(final BalancerClusterState cluster) { 128 this.cluster = cluster; 129 this.loadRules(); 130 } 131 132 @Override 133 protected double cost() { 134 double cost = 0; 135 final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity); 136 137 for (int i = 0; i < this.cluster.numServers; i++) { 138 // retrieve capacity for each RS 139 final ServerName sn = this.cluster.servers[i]; 140 final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions); 141 final double nbrRegions = this.cluster.regionsPerServer[i].length; 142 final double usage = nbrRegions / limit; 143 if (usage > targetUsage) { 144 // cost is the number of regions above the local limit 145 final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit; 146 cost += localCost; 147 } 148 } 149 150 return cost / (double) this.cluster.numServers; 151 } 152 153 /** 154 * used to load the rule files. 155 */ 156 @RestrictedApi(explanation = "Should only be called in tests", link = "", 157 allowedOnPath = ".*(/src/test/.*|HeterogeneousRegionCountCostFunction).java") 158 void loadRules() { 159 final List<String> lines = readFile(this.rulesPath); 160 if (null == lines) { 161 LOG.warn("cannot load rules file, keeping latest rules file which has " 162 + this.limitPerRule.size() + " rules"); 163 return; 164 } 165 166 LOG.info("loading rules file '" + this.rulesPath + "'"); 167 this.limitPerRule.clear(); 168 for (final String line : lines) { 169 try { 170 if (line.length() == 0) { 171 continue; 172 } 173 if (line.startsWith("#")) { 174 continue; 175 } 176 final String[] splits = line.split(" "); 177 if (splits.length != 2) { 178 throw new IOException( 179 "line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line"); 180 } 181 182 final Pattern pattern = Pattern.compile(splits[0]); 183 final Integer limit = Integer.parseInt(splits[1]); 184 this.limitPerRule.put(pattern, limit); 185 } catch (final IOException | NumberFormatException | PatternSyntaxException e) { 186 LOG.error("error on line: " + e); 187 } 188 } 189 this.rebuildCache(); 190 } 191 192 /** 193 * used to read the rule files from either HDFS or local FS 194 */ 195 private List<String> readFile(final String filename) { 196 if (null == filename) { 197 return null; 198 } 199 try { 200 if (filename.startsWith("file:")) { 201 return readFileFromLocalFS(filename); 202 } 203 return readFileFromHDFS(filename); 204 } catch (IOException e) { 205 LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage()); 206 return null; 207 } 208 } 209 210 /** 211 * used to read the rule files from HDFS 212 */ 213 private List<String> readFileFromHDFS(final String filename) throws IOException { 214 final Path path = new Path(filename); 215 final FileSystem fs = FileSystem.get(this.conf); 216 final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path))); 217 return readLines(reader); 218 } 219 220 /** 221 * used to read the rule files from local FS 222 */ 223 private List<String> readFileFromLocalFS(final String filename) throws IOException { 224 BufferedReader reader = new BufferedReader(new FileReader(filename)); 225 return readLines(reader); 226 } 227 228 private List<String> readLines(BufferedReader reader) throws IOException { 229 final List<String> records = new ArrayList<>(); 230 try { 231 String line; 232 while ((line = reader.readLine()) != null) { 233 records.add(line); 234 } 235 } finally { 236 reader.close(); 237 } 238 return records; 239 } 240 241 /** 242 * Rebuild cache matching ServerNames and their capacity. 243 */ 244 private void rebuildCache() { 245 LOG.debug("Rebuilding cache of capacity for each RS"); 246 this.limitPerRS.clear(); 247 this.totalCapacity = 0; 248 if (null == this.cluster) { 249 return; 250 } 251 for (int i = 0; i < this.cluster.numServers; i++) { 252 final ServerName sn = this.cluster.servers[i]; 253 final int capacity = this.findLimitForRS(sn); 254 LOG.debug(sn.getHostname() + " can hold " + capacity + " regions"); 255 this.totalCapacity += capacity; 256 } 257 overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity; 258 LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions (" 259 + Math.round(overallUsage * 100) + "%)"); 260 if (overallUsage >= 1) { 261 LOG.warn("Cluster is overused, {}", overallUsage); 262 } 263 } 264 265 /** 266 * Find the limit for a ServerName. If not found then return the default value 267 * @param serverName the server we are looking for 268 * @return the limit 269 */ 270 int findLimitForRS(final ServerName serverName) { 271 boolean matched = false; 272 int limit = -1; 273 for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) { 274 if (entry.getKey().matcher(serverName.getHostname()).matches()) { 275 matched = true; 276 limit = entry.getValue(); 277 break; 278 } 279 } 280 if (!matched) { 281 limit = this.defaultNumberOfRegions; 282 } 283 // Feeding cache 284 this.limitPerRS.put(serverName, limit); 285 return limit; 286 } 287 288 int getNumberOfRulesLoaded() { 289 return this.limitPerRule.size(); 290 } 291}