001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeMap; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.constraint.ConstraintException; 038import org.apache.hadoop.hbase.favored.FavoredNodesManager; 039import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 040import org.apache.hadoop.hbase.master.LoadBalancer; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider; 044import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 045import org.apache.hadoop.hbase.master.balancer.MasterClusterInfoProvider; 046import org.apache.hadoop.hbase.net.Address; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 054import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 057 058/** 059 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does 060 * region balance based on a table's group membership. 061 * <p/> 062 * Most assignment methods contain two exclusive code paths: Online - when the group table is online 063 * and Offline - when it is unavailable. 064 * <p/> 065 * During Offline, assignments are assigned based on cached information in zookeeper. If unavailable 066 * (ie bootstrap) then regions are assigned randomly. 067 * <p/> 068 * Once the GROUP table has been assigned, the balancer switches to Online and will then start 069 * providing appropriate assignments for user tables. 070 */ 071@InterfaceAudience.Private 072public class RSGroupBasedLoadBalancer implements LoadBalancer { 073 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 074 075 private MasterServices masterServices; 076 private ClusterInfoProvider provider; 077 private FavoredNodesManager favoredNodesManager; 078 private volatile RSGroupInfoManager rsGroupInfoManager; 079 private volatile LoadBalancer internalBalancer; 080 081 /** 082 * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first, 083 * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch 084 * on at the same time, which is relied on to correct misplaced regions 085 */ 086 public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable"; 087 088 private volatile boolean fallbackEnabled = false; 089 090 /** 091 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 092 */ 093 @InterfaceAudience.Private 094 public RSGroupBasedLoadBalancer() { 095 } 096 097 // must be called after calling initialize 098 @Override 099 public synchronized void updateClusterMetrics(ClusterMetrics sm) { 100 assert internalBalancer != null; 101 internalBalancer.updateClusterMetrics(sm); 102 } 103 104 @Override 105 public synchronized void 106 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 107 internalBalancer.updateBalancerLoadInfo(loadOfAllTable); 108 } 109 110 public void setMasterServices(MasterServices masterServices) { 111 this.masterServices = masterServices; 112 } 113 114 /** 115 * Balance by RSGroup. 116 */ 117 @Override 118 public synchronized List<RegionPlan> balanceCluster( 119 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException { 120 if (!isOnline()) { 121 throw new ConstraintException( 122 RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance"); 123 } 124 125 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 126 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, 127 List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable); 128 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable = 129 correctedStateAndRegionPlans.getFirst(); 130 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 131 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 132 // Add RegionPlan 133 // for the regions which have been placed according to the region server group assignment 134 // into the movement list 135 try { 136 // For each rsgroup 137 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 138 LOG.debug("Balancing RSGroup={}", rsgroup.getName()); 139 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>(); 140 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable 141 .entrySet()) { 142 TableName tableName = entry.getKey(); 143 RSGroupInfo targetRSGInfo = RSGroupUtil 144 .getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo); 145 if (targetRSGInfo.getName().equals(rsgroup.getName())) { 146 loadOfTablesInGroup.put(tableName, entry.getValue()); 147 } 148 } 149 List<RegionPlan> groupPlans = null; 150 if (!loadOfTablesInGroup.isEmpty()) { 151 LOG.info("Start Generate Balance plan for group: " + rsgroup.getName()); 152 groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup); 153 } 154 if (groupPlans != null) { 155 regionPlans.addAll(groupPlans); 156 } 157 } 158 } catch (IOException exp) { 159 LOG.warn("Exception while balancing cluster.", exp); 160 regionPlans.clear(); 161 } 162 163 // Return the whole movement list 164 return regionPlans; 165 } 166 167 @Override 168 @NonNull 169 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 170 List<ServerName> servers) throws IOException { 171 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 172 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 173 generateGroupAssignments(regions, servers); 174 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 175 Map<ServerName, List<RegionInfo>> result = 176 this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond()); 177 result.forEach((server, regionInfos) -> assignments 178 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos)); 179 } 180 return assignments; 181 } 182 183 @Override 184 @NonNull 185 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 186 List<ServerName> servers) throws HBaseIOException { 187 try { 188 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 189 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 190 generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers); 191 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 192 List<RegionInfo> regionList = pair.getFirst(); 193 Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap(); 194 regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r))); 195 Map<ServerName, List<RegionInfo>> pairResult = 196 this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond()); 197 pairResult.forEach((server, rs) -> assignments 198 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs)); 199 } 200 return assignments; 201 } catch (IOException e) { 202 throw new HBaseIOException("Failed to do online retain assignment", e); 203 } 204 } 205 206 @Override 207 public ServerName randomAssignment(RegionInfo region, List<ServerName> servers) 208 throws IOException { 209 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 210 generateGroupAssignments(Lists.newArrayList(region), servers); 211 List<ServerName> filteredServers = pairs.iterator().next().getSecond(); 212 return this.internalBalancer.randomAssignment(region, filteredServers); 213 } 214 215 private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments( 216 List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException { 217 try { 218 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 219 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 220 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 221 for (RegionInfo region : regions) { 222 String groupName = 223 RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) 224 .orElse(defaultInfo).getName(); 225 regionMap.put(groupName, region); 226 } 227 for (String groupKey : regionMap.keySet()) { 228 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 229 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 230 } 231 232 List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList(); 233 List<RegionInfo> fallbackRegions = Lists.newArrayList(); 234 for (String groupKey : regionMap.keySet()) { 235 if (serverMap.get(groupKey).isEmpty()) { 236 fallbackRegions.addAll(regionMap.get(groupKey)); 237 } else { 238 result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey))); 239 } 240 } 241 if (!fallbackRegions.isEmpty()) { 242 List<ServerName> candidates = null; 243 if (isFallbackEnabled()) { 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("Falling back {} regions to servers outside their RSGroup. Regions: {}", 246 fallbackRegions.size(), fallbackRegions.stream() 247 .map(RegionInfo::getRegionNameAsString).collect(Collectors.toSet())); 248 } 249 candidates = getFallBackCandidates(servers); 250 } 251 candidates = (candidates == null || candidates.isEmpty()) 252 ? Lists.newArrayList(BOGUS_SERVER_NAME) 253 : candidates; 254 result.add(Pair.newPair(fallbackRegions, candidates)); 255 } 256 return result; 257 } catch (IOException e) { 258 throw new HBaseIOException("Failed to generate group assignments", e); 259 } 260 } 261 262 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 263 List<ServerName> onlineServers) { 264 if (RSGroupInfo != null) { 265 return filterServers(RSGroupInfo.getServers(), onlineServers); 266 } else { 267 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 268 return Collections.emptyList(); 269 } 270 } 271 272 /** 273 * Filter servers based on the online servers. 274 * <p/> 275 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 276 * its contains()'s time complexity as O(logn), which is good enough. 277 * <p/> 278 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 279 * needed. 280 * @param servers the servers 281 * @param onlineServers List of servers which are online. 282 * @return the list 283 */ 284 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 285 ArrayList<ServerName> finalList = new ArrayList<>(); 286 for (ServerName onlineServer : onlineServers) { 287 if (servers.contains(onlineServer.getAddress())) { 288 finalList.add(onlineServer); 289 } 290 } 291 return finalList; 292 } 293 294 private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 295 correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments) 296 throws IOException { 297 // To return 298 Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>(); 299 List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); 300 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 301 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments 302 .entrySet()) { 303 TableName tableName = assignments.getKey(); 304 Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue(); 305 RSGroupInfo targetRSGInfo = null; 306 Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>(); 307 try { 308 targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName) 309 .orElse(defaultInfo); 310 } catch (IOException exp) { 311 LOG.debug("RSGroup information null for region of table " + tableName, exp); 312 } 313 for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) { 314 ServerName currentHostServer = serverRegionMap.getKey(); 315 List<RegionInfo> regionInfoList = serverRegionMap.getValue(); 316 if ( 317 targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress()) 318 ) { 319 regionInfoList.forEach(regionInfo -> { 320 regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null)); 321 }); 322 } else { 323 correctServerRegion.put(currentHostServer, regionInfoList); 324 } 325 } 326 correctAssignments.put(tableName, correctServerRegion); 327 } 328 329 // Return correct assignments and region movement plan for mis-placed regions together 330 return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>( 331 correctAssignments, regionPlansForMisplacedRegions); 332 } 333 334 @Override 335 public void initialize() throws IOException { 336 if (rsGroupInfoManager == null) { 337 rsGroupInfoManager = masterServices.getRSGroupInfoManager(); 338 if (rsGroupInfoManager == null) { 339 String msg = "RSGroupInfoManager hasn't been initialized"; 340 LOG.error(msg); 341 throw new HBaseIOException(msg); 342 } 343 rsGroupInfoManager.start(); 344 } 345 346 // Create the balancer 347 Configuration conf = masterServices.getConfiguration(); 348 Class<? extends LoadBalancer> balancerClass; 349 @SuppressWarnings("deprecation") 350 String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS); 351 if (balancerClassName == null) { 352 balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, 353 LoadBalancerFactory.getDefaultLoadBalancerClass(), LoadBalancer.class); 354 } else { 355 try { 356 balancerClass = Class.forName(balancerClassName).asSubclass(LoadBalancer.class); 357 } catch (ClassNotFoundException e) { 358 throw new IOException(e); 359 } 360 } 361 this.provider = new MasterClusterInfoProvider(masterServices); 362 // avoid infinite nesting 363 if (getClass().isAssignableFrom(balancerClass)) { 364 balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass(); 365 } 366 internalBalancer = ReflectionUtils.newInstance(balancerClass); 367 internalBalancer.setClusterInfoProvider(provider); 368 // special handling for favor node balancers 369 if (internalBalancer instanceof FavoredNodesPromoter) { 370 favoredNodesManager = new FavoredNodesManager(provider); 371 ((FavoredNodesPromoter) internalBalancer).setFavoredNodesManager(favoredNodesManager); 372 } 373 internalBalancer.initialize(); 374 // init fallback groups 375 this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 376 } 377 378 public boolean isOnline() { 379 if (this.rsGroupInfoManager == null) { 380 return false; 381 } 382 383 return this.rsGroupInfoManager.isOnline(); 384 } 385 386 public boolean isFallbackEnabled() { 387 return fallbackEnabled; 388 } 389 390 @Override 391 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 392 } 393 394 @Override 395 public void regionOffline(RegionInfo regionInfo) { 396 } 397 398 @Override 399 public synchronized void onConfigurationChange(Configuration conf) { 400 boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 401 if (fallbackEnabled != newFallbackEnabled) { 402 LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled, 403 newFallbackEnabled); 404 fallbackEnabled = newFallbackEnabled; 405 } 406 provider.onConfigurationChange(conf); 407 internalBalancer.onConfigurationChange(conf); 408 } 409 410 @Override 411 public void stop(String why) { 412 internalBalancer.stop(why); 413 } 414 415 @Override 416 public boolean isStopped() { 417 return internalBalancer.isStopped(); 418 } 419 420 public LoadBalancer getInternalBalancer() { 421 return internalBalancer; 422 } 423 424 public FavoredNodesManager getFavoredNodesManager() { 425 return favoredNodesManager; 426 } 427 428 @Override 429 public synchronized void postMasterStartupInitialize() { 430 this.internalBalancer.postMasterStartupInitialize(); 431 } 432 433 public void updateBalancerStatus(boolean status) { 434 internalBalancer.updateBalancerStatus(status); 435 } 436 437 private List<ServerName> getFallBackCandidates(List<ServerName> servers) { 438 List<ServerName> serverNames = null; 439 try { 440 RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 441 serverNames = filterOfflineServers(info, servers); 442 } catch (IOException e) { 443 LOG.error("Failed to get default rsgroup info to fallback", e); 444 } 445 return serverNames == null || serverNames.isEmpty() ? servers : serverNames; 446 } 447 448 @Override 449 public void setClusterInfoProvider(ClusterInfoProvider provider) { 450 throw new UnsupportedOperationException("Just call set master service instead"); 451 } 452}