001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import edu.umd.cs.findbugs.annotations.NonNull; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.TreeMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseIOException; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.constraint.ConstraintException; 038import org.apache.hadoop.hbase.master.LoadBalancer; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.RegionPlan; 041import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 043import org.apache.hadoop.hbase.net.Address; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.util.ReflectionUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 054 055/** 056 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) 057 * It does region balance based on a table's group membership. 058 * 059 * Most assignment methods contain two exclusive code paths: Online - when the group 060 * table is online and Offline - when it is unavailable. 061 * 062 * During Offline, assignments are assigned based on cached information in zookeeper. 063 * If unavailable (ie bootstrap) then regions are assigned randomly. 064 * 065 * Once the GROUP table has been assigned, the balancer switches to Online and will then 066 * start providing appropriate assignments for user tables. 067 * 068 */ 069@InterfaceAudience.Private 070public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 071 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 072 073 private Configuration config; 074 private ClusterMetrics clusterStatus; 075 private MasterServices masterServices; 076 private volatile RSGroupInfoManager rsGroupInfoManager; 077 private LoadBalancer internalBalancer; 078 079 /** 080 * Set this key to {@code true} to allow region fallback. 081 * Fallback to the default rsgroup first, then fallback to any group if no online servers in 082 * default rsgroup. 083 * Please keep balancer switch on at the same time, which is relied on to correct misplaced 084 * regions 085 */ 086 public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable"; 087 088 private boolean fallbackEnabled = false; 089 090 /** 091 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 092 */ 093 @InterfaceAudience.Private 094 public RSGroupBasedLoadBalancer() {} 095 096 @Override 097 public Configuration getConf() { 098 return config; 099 } 100 101 @Override 102 public void setConf(Configuration conf) { 103 this.config = conf; 104 if (internalBalancer != null) { 105 internalBalancer.setConf(conf); 106 } 107 } 108 109 @Override 110 public void setClusterMetrics(ClusterMetrics sm) { 111 this.clusterStatus = sm; 112 if (internalBalancer != null) { 113 internalBalancer.setClusterMetrics(sm); 114 } 115 } 116 117 @Override 118 public void setMasterServices(MasterServices masterServices) { 119 this.masterServices = masterServices; 120 } 121 122 /** 123 * Override to balance by RSGroup 124 * not invoke {@link #balanceTable(TableName, Map)} 125 */ 126 @Override 127 public List<RegionPlan> balanceCluster( 128 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException { 129 if (!isOnline()) { 130 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + 131 " is not online, unable to perform balance"); 132 } 133 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 134 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 135 correctedStateAndRegionPlans = correctAssignments(loadOfAllTable); 136 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable = 137 correctedStateAndRegionPlans.getFirst(); 138 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 139 // Add RegionPlan for the regions which have been placed according to the region server group 140 // assignment into the movement list 141 try { 142 // For each rsgroup 143 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 144 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>(); 145 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable 146 .entrySet()) { 147 TableName tableName = entry.getKey(); 148 String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 149 if (targetRSGroupName == null) { 150 targetRSGroupName = RSGroupInfo.DEFAULT_GROUP; 151 } 152 if (targetRSGroupName.equals(rsgroup.getName())) { 153 loadOfTablesInGroup.put(tableName, entry.getValue()); 154 } 155 } 156 List<RegionPlan> groupPlans = null; 157 if (!loadOfTablesInGroup.isEmpty()) { 158 LOG.info("Start Generate Balance plan for group: " + rsgroup.getName()); 159 groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup); 160 } 161 if (groupPlans != null) { 162 regionPlans.addAll(groupPlans); 163 } 164 } 165 } catch (IOException exp) { 166 LOG.warn("Exception while balancing cluster.", exp); 167 regionPlans.clear(); 168 } 169 return regionPlans; 170 } 171 172 @Override 173 @NonNull 174 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 175 List<ServerName> servers) throws HBaseIOException { 176 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 177 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 178 generateGroupAssignments(regions, servers); 179 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 180 Map<ServerName, List<RegionInfo>> result = 181 this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond()); 182 result.forEach((server, regionInfos) -> assignments 183 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos)); 184 } 185 return assignments; 186 } 187 188 @Override 189 @NonNull 190 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 191 List<ServerName> servers) throws HBaseIOException { 192 try { 193 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 194 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 195 generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers); 196 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 197 List<RegionInfo> regionList = pair.getFirst(); 198 Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap(); 199 regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r))); 200 Map<ServerName, List<RegionInfo>> pairResult = 201 this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond()); 202 pairResult.forEach((server, rs) -> assignments 203 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs)); 204 } 205 return assignments; 206 } catch (IOException e) { 207 throw new HBaseIOException("Failed to do online retain assignment", e); 208 } 209 } 210 211 @Override 212 public ServerName randomAssignment(RegionInfo region, 213 List<ServerName> servers) throws HBaseIOException { 214 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 215 generateGroupAssignments(Lists.newArrayList(region), servers); 216 List<ServerName> filteredServers = pairs.iterator().next().getSecond(); 217 return this.internalBalancer.randomAssignment(region, filteredServers); 218 } 219 220 private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments( 221 List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException { 222 try { 223 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 224 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 225 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 226 for (RegionInfo region : regions) { 227 String groupName = 228 Optional.ofNullable(rsGroupInfoManager.getRSGroupOfTable(region.getTable())) 229 .orElse(defaultInfo.getName()); 230 regionMap.put(groupName, region); 231 } 232 for (String groupKey : regionMap.keySet()) { 233 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 234 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 235 } 236 237 List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList(); 238 List<RegionInfo> fallbackRegions = Lists.newArrayList(); 239 for (String groupKey : regionMap.keySet()) { 240 if (serverMap.get(groupKey).isEmpty()) { 241 fallbackRegions.addAll(regionMap.get(groupKey)); 242 } else { 243 result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey))); 244 } 245 } 246 if (!fallbackRegions.isEmpty()) { 247 List<ServerName> candidates = null; 248 if (isFallbackEnabled()) { 249 candidates = getFallBackCandidates(servers); 250 } 251 candidates = (candidates == null || candidates.isEmpty()) ? 252 Lists.newArrayList(BOGUS_SERVER_NAME) : candidates; 253 result.add(Pair.newPair(fallbackRegions, candidates)); 254 } 255 return result; 256 } catch(IOException e) { 257 throw new HBaseIOException("Failed to generate group assignments", e); 258 } 259 } 260 261 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 262 List<ServerName> onlineServers) { 263 if (RSGroupInfo != null) { 264 return filterServers(RSGroupInfo.getServers(), onlineServers); 265 } else { 266 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 267 return Collections.emptyList(); 268 } 269 } 270 271 /** 272 * Filter servers based on the online servers. 273 * <p/> 274 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 275 * its contains()'s time complexity as O(logn), which is good enough. 276 * <p/> 277 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 278 * needed. 279 * @param servers the servers 280 * @param onlineServers List of servers which are online. 281 * @return the list 282 */ 283 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 284 ArrayList<ServerName> finalList = new ArrayList<>(); 285 for (ServerName onlineServer : onlineServers) { 286 if (servers.contains(onlineServer.getAddress())) { 287 finalList.add(onlineServer); 288 } 289 } 290 return finalList; 291 } 292 293 private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 294 correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments) 295 throws IOException { 296 // To return 297 Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>(); 298 List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); 299 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments 300 .entrySet()) { 301 TableName tableName = assignments.getKey(); 302 Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue(); 303 Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>(); 304 RSGroupInfo targetRSGInfo = null; 305 try { 306 String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 307 if (groupName == null) { 308 LOG.debug("Group not found for table " + tableName + ", using default"); 309 groupName = RSGroupInfo.DEFAULT_GROUP; 310 } 311 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 312 } catch (IOException exp) { 313 LOG.debug("RSGroup information null for region of table " + tableName, exp); 314 } 315 for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) { 316 ServerName currentHostServer = serverRegionMap.getKey(); 317 List<RegionInfo> regionInfoList = serverRegionMap.getValue(); 318 if (targetRSGInfo == null 319 || !targetRSGInfo.containsServer(currentHostServer.getAddress())) { 320 regionInfoList.forEach(regionInfo -> { 321 regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null)); 322 }); 323 } else { 324 correctServerRegion.put(currentHostServer, regionInfoList); 325 } 326 } 327 correctAssignments.put(tableName, correctServerRegion); 328 } 329 // Return correct assignments and region movement plan for mis-placed regions together 330 return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>( 331 correctAssignments, regionPlansForMisplacedRegions); 332 } 333 334 @Override 335 public void initialize() throws HBaseIOException { 336 try { 337 if (rsGroupInfoManager == null) { 338 List<RSGroupAdminEndpoint> cps = 339 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 340 if (cps.size() != 1) { 341 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 342 LOG.error(msg); 343 throw new HBaseIOException(msg); 344 } 345 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 346 if(rsGroupInfoManager == null){ 347 String msg = "RSGroupInfoManager hasn't been initialized"; 348 LOG.error(msg); 349 throw new HBaseIOException(msg); 350 } 351 rsGroupInfoManager.start(); 352 } 353 } catch (IOException e) { 354 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 355 } 356 357 // Create the balancer 358 Class<? extends LoadBalancer> balancerClass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 359 StochasticLoadBalancer.class, LoadBalancer.class); 360 if (this.getClass().isAssignableFrom(balancerClass)) { 361 LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " + 362 "falling back to the default LoadBalancer class"); 363 balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass(); 364 } 365 internalBalancer = ReflectionUtils.newInstance(balancerClass, config); 366 internalBalancer.setMasterServices(masterServices); 367 if (clusterStatus != null) { 368 internalBalancer.setClusterMetrics(clusterStatus); 369 } 370 internalBalancer.setConf(config); 371 internalBalancer.initialize(); 372 // init fallback groups 373 this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 374 } 375 376 public boolean isOnline() { 377 if (this.rsGroupInfoManager == null) { 378 return false; 379 } 380 381 return this.rsGroupInfoManager.isOnline(); 382 } 383 384 public boolean isFallbackEnabled() { 385 return fallbackEnabled; 386 } 387 388 @Override 389 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 390 } 391 392 @Override 393 public void regionOffline(RegionInfo regionInfo) { 394 } 395 396 @Override 397 public void onConfigurationChange(Configuration conf) { 398 boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 399 if (fallbackEnabled != newFallbackEnabled) { 400 LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, 401 fallbackEnabled, newFallbackEnabled); 402 fallbackEnabled = newFallbackEnabled; 403 } 404 } 405 406 @Override 407 public void stop(String why) { 408 } 409 410 @Override 411 public boolean isStopped() { 412 return false; 413 } 414 415 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 416 this.rsGroupInfoManager = rsGroupInfoManager; 417 } 418 419 @Override 420 public void postMasterStartupInitialize() { 421 this.internalBalancer.postMasterStartupInitialize(); 422 } 423 424 public void updateBalancerStatus(boolean status) { 425 internalBalancer.updateBalancerStatus(status); 426 } 427 428 /** 429 * can achieve table balanced rather than overall balanced 430 */ 431 @Override 432 public List<RegionPlan> balanceTable(TableName tableName, 433 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 434 if (!isOnline()) { 435 LOG.error(RSGroupInfoManager.class.getSimpleName() 436 + " is not online, unable to perform balanceTable"); 437 return null; 438 } 439 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>(); 440 loadOfThisTable.put(tableName, loadOfOneTable); 441 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 442 correctedStateAndRegionPlans; 443 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 444 try { 445 correctedStateAndRegionPlans = correctAssignments(loadOfThisTable); 446 } catch (IOException e) { 447 LOG.error("get correct assignments and mis-placed regions error ", e); 448 return null; 449 } 450 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable = 451 correctedStateAndRegionPlans.getFirst(); 452 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 453 List<RegionPlan> tablePlans = 454 this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName)); 455 456 if (tablePlans != null) { 457 regionPlans.addAll(tablePlans); 458 } 459 return regionPlans; 460 } 461 462 private List<ServerName> getFallBackCandidates(List<ServerName> servers) { 463 List<ServerName> serverNames = null; 464 try { 465 RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 466 serverNames = filterOfflineServers(info, servers); 467 } catch (IOException e) { 468 LOG.error("Failed to get default rsgroup info to fallback", e); 469 } 470 return serverNames == null || serverNames.isEmpty() ? servers : serverNames; 471 } 472}