001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.HBaseIOException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.constraint.ConstraintException; 040import org.apache.hadoop.hbase.favored.FavoredNodesManager; 041import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 042import org.apache.hadoop.hbase.master.LoadBalancer; 043import org.apache.hadoop.hbase.master.MasterServices; 044import org.apache.hadoop.hbase.master.RegionPlan; 045import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider; 046import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 047import org.apache.hadoop.hbase.master.balancer.MasterClusterInfoProvider; 048import org.apache.hadoop.hbase.net.Address; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.hadoop.hbase.util.ReflectionUtils; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 056import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 059 060/** 061 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does 062 * region balance based on a table's group membership. 063 * <p/> 064 * Most assignment methods contain two exclusive code paths: Online - when the group table is online 065 * and Offline - when it is unavailable. 066 * <p/> 067 * During Offline, assignments are assigned based on cached information in zookeeper. If unavailable 068 * (ie bootstrap) then regions are assigned randomly. 069 * <p/> 070 * Once the GROUP table has been assigned, the balancer switches to Online and will then start 071 * providing appropriate assignments for user tables. 072 */ 073@InterfaceAudience.Private 074public class RSGroupBasedLoadBalancer implements LoadBalancer { 075 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 076 077 private MasterServices masterServices; 078 private ClusterInfoProvider provider; 079 private FavoredNodesManager favoredNodesManager; 080 private volatile RSGroupInfoManager rsGroupInfoManager; 081 private volatile LoadBalancer internalBalancer; 082 083 /** 084 * Tracks whether a balance run is currently in progress. 085 */ 086 private final AtomicBoolean isBalancing = new AtomicBoolean(false); 087 088 /** 089 * Holds a configuration update that arrived while a balance run was in progress. 090 */ 091 private AtomicReference<Configuration> pendingConfiguration = new AtomicReference<>(); 092 093 /** 094 * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first, 095 * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch 096 * on at the same time, which is relied on to correct misplaced regions 097 */ 098 public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable"; 099 100 private volatile boolean fallbackEnabled = false; 101 102 /** 103 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 104 */ 105 @InterfaceAudience.Private 106 public RSGroupBasedLoadBalancer() { 107 } 108 109 // must be called after calling initialize 110 @Override 111 public synchronized void updateClusterMetrics(ClusterMetrics sm) { 112 assert internalBalancer != null; 113 internalBalancer.updateClusterMetrics(sm); 114 } 115 116 @Override 117 public synchronized void 118 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 119 internalBalancer.updateBalancerLoadInfo(loadOfAllTable); 120 } 121 122 public void setMasterServices(MasterServices masterServices) { 123 this.masterServices = masterServices; 124 } 125 126 /** 127 * Balance by RSGroup. 128 */ 129 @Override 130 public synchronized List<RegionPlan> balanceCluster( 131 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException { 132 if (!isOnline()) { 133 throw new ConstraintException( 134 RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance"); 135 } 136 137 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 138 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, 139 List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable); 140 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable = 141 correctedStateAndRegionPlans.getFirst(); 142 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 143 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 144 // Add RegionPlan 145 // for the regions which have been placed according to the region server group assignment 146 // into the movement list 147 try { 148 // For each rsgroup 149 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 150 LOG.debug("Balancing RSGroup={}", rsgroup.getName()); 151 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>(); 152 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable 153 .entrySet()) { 154 TableName tableName = entry.getKey(); 155 RSGroupInfo targetRSGInfo = RSGroupUtil 156 .getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo); 157 if (targetRSGInfo.getName().equals(rsgroup.getName())) { 158 loadOfTablesInGroup.put(tableName, entry.getValue()); 159 } 160 } 161 List<RegionPlan> groupPlans = null; 162 if (!loadOfTablesInGroup.isEmpty()) { 163 LOG.info("Start Generate Balance plan for group: " + rsgroup.getName()); 164 groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup); 165 } 166 if (groupPlans != null) { 167 regionPlans.addAll(groupPlans); 168 } 169 } 170 } catch (IOException exp) { 171 LOG.warn("Exception while balancing cluster.", exp); 172 regionPlans.clear(); 173 } 174 175 // Return the whole movement list 176 return regionPlans; 177 } 178 179 @Override 180 @NonNull 181 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 182 List<ServerName> servers) throws IOException { 183 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 184 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 185 generateGroupAssignments(regions, servers); 186 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 187 Map<ServerName, List<RegionInfo>> result = 188 this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond()); 189 result.forEach((server, regionInfos) -> assignments 190 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos)); 191 } 192 return assignments; 193 } 194 195 @Override 196 @NonNull 197 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 198 List<ServerName> servers) throws HBaseIOException { 199 try { 200 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 201 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 202 generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers); 203 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 204 List<RegionInfo> regionList = pair.getFirst(); 205 Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap(); 206 regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r))); 207 Map<ServerName, List<RegionInfo>> pairResult = 208 this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond()); 209 pairResult.forEach((server, rs) -> assignments 210 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs)); 211 } 212 return assignments; 213 } catch (IOException e) { 214 throw new HBaseIOException("Failed to do online retain assignment", e); 215 } 216 } 217 218 @Override 219 public ServerName randomAssignment(RegionInfo region, List<ServerName> servers) 220 throws IOException { 221 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 222 generateGroupAssignments(Lists.newArrayList(region), servers); 223 List<ServerName> filteredServers = pairs.iterator().next().getSecond(); 224 return this.internalBalancer.randomAssignment(region, filteredServers); 225 } 226 227 private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments( 228 List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException { 229 try { 230 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 231 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 232 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 233 for (RegionInfo region : regions) { 234 String groupName = 235 RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) 236 .orElse(defaultInfo).getName(); 237 regionMap.put(groupName, region); 238 } 239 for (String groupKey : regionMap.keySet()) { 240 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 241 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 242 } 243 244 List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList(); 245 List<RegionInfo> fallbackRegions = Lists.newArrayList(); 246 for (String groupKey : regionMap.keySet()) { 247 if (serverMap.get(groupKey).isEmpty()) { 248 fallbackRegions.addAll(regionMap.get(groupKey)); 249 } else { 250 result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey))); 251 } 252 } 253 if (!fallbackRegions.isEmpty()) { 254 List<ServerName> candidates = null; 255 if (isFallbackEnabled()) { 256 if (LOG.isDebugEnabled()) { 257 LOG.debug("Falling back {} regions to servers outside their RSGroup. Regions: {}", 258 fallbackRegions.size(), fallbackRegions.stream() 259 .map(RegionInfo::getRegionNameAsString).collect(Collectors.toSet())); 260 } 261 candidates = getFallBackCandidates(servers); 262 } 263 candidates = (candidates == null || candidates.isEmpty()) 264 ? Lists.newArrayList(BOGUS_SERVER_NAME) 265 : candidates; 266 result.add(Pair.newPair(fallbackRegions, candidates)); 267 } 268 return result; 269 } catch (IOException e) { 270 throw new HBaseIOException("Failed to generate group assignments", e); 271 } 272 } 273 274 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 275 List<ServerName> onlineServers) { 276 if (RSGroupInfo != null) { 277 return filterServers(RSGroupInfo.getServers(), onlineServers); 278 } else { 279 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 280 return Collections.emptyList(); 281 } 282 } 283 284 /** 285 * Filter servers based on the online servers. 286 * <p/> 287 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 288 * its contains()'s time complexity as O(logn), which is good enough. 289 * <p/> 290 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 291 * needed. 292 * @param servers the servers 293 * @param onlineServers List of servers which are online. 294 * @return the list 295 */ 296 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 297 ArrayList<ServerName> finalList = new ArrayList<>(); 298 for (ServerName onlineServer : onlineServers) { 299 if (servers.contains(onlineServer.getAddress())) { 300 finalList.add(onlineServer); 301 } 302 } 303 return finalList; 304 } 305 306 private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 307 correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments) 308 throws IOException { 309 // To return 310 Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>(); 311 List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); 312 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 313 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments 314 .entrySet()) { 315 TableName tableName = assignments.getKey(); 316 Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue(); 317 RSGroupInfo targetRSGInfo = null; 318 Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>(); 319 try { 320 targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName) 321 .orElse(defaultInfo); 322 } catch (IOException exp) { 323 LOG.debug("RSGroup information null for region of table " + tableName, exp); 324 } 325 for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) { 326 ServerName currentHostServer = serverRegionMap.getKey(); 327 List<RegionInfo> regionInfoList = serverRegionMap.getValue(); 328 if ( 329 targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress()) 330 ) { 331 regionInfoList.forEach(regionInfo -> { 332 regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null)); 333 }); 334 } else { 335 correctServerRegion.put(currentHostServer, regionInfoList); 336 } 337 } 338 correctAssignments.put(tableName, correctServerRegion); 339 } 340 341 // Return correct assignments and region movement plan for mis-placed regions together 342 return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>( 343 correctAssignments, regionPlansForMisplacedRegions); 344 } 345 346 @Override 347 public void initialize() throws IOException { 348 if (rsGroupInfoManager == null) { 349 rsGroupInfoManager = masterServices.getRSGroupInfoManager(); 350 if (rsGroupInfoManager == null) { 351 String msg = "RSGroupInfoManager hasn't been initialized"; 352 LOG.error(msg); 353 throw new HBaseIOException(msg); 354 } 355 rsGroupInfoManager.start(); 356 } 357 358 // Create the balancer 359 Configuration conf = masterServices.getConfiguration(); 360 Class<? extends LoadBalancer> balancerClass; 361 @SuppressWarnings("deprecation") 362 String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS); 363 if (balancerClassName == null) { 364 balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, 365 LoadBalancerFactory.getDefaultLoadBalancerClass(), LoadBalancer.class); 366 } else { 367 try { 368 balancerClass = Class.forName(balancerClassName).asSubclass(LoadBalancer.class); 369 } catch (ClassNotFoundException e) { 370 throw new IOException(e); 371 } 372 } 373 this.provider = new MasterClusterInfoProvider(masterServices); 374 // avoid infinite nesting 375 if (getClass().isAssignableFrom(balancerClass)) { 376 balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass(); 377 } 378 internalBalancer = ReflectionUtils.newInstance(balancerClass); 379 internalBalancer.setClusterInfoProvider(provider); 380 // special handling for favor node balancers 381 if (internalBalancer instanceof FavoredNodesPromoter) { 382 favoredNodesManager = new FavoredNodesManager(provider); 383 ((FavoredNodesPromoter) internalBalancer).setFavoredNodesManager(favoredNodesManager); 384 } 385 internalBalancer.initialize(); 386 // init fallback groups 387 this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 388 } 389 390 public boolean isOnline() { 391 if (this.rsGroupInfoManager == null) { 392 return false; 393 } 394 395 return this.rsGroupInfoManager.isOnline(); 396 } 397 398 public boolean isFallbackEnabled() { 399 return fallbackEnabled; 400 } 401 402 @Override 403 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 404 } 405 406 @Override 407 public void regionOffline(RegionInfo regionInfo) { 408 } 409 410 @Override 411 public void onConfigurationChange(Configuration conf) { 412 // Refer to HBASE-29933 413 synchronized (this) { 414 // If balance is running, store configuration in pendingConfiguration and return immediately. 415 // Defer the config update. 416 if (isBalancing.get()) { 417 LOG.debug( 418 "Balance is in progress, defer applying configuration change until balance completed."); 419 pendingConfiguration.set(conf); 420 } else { 421 // Apply configuration change immediately. 422 updateConfiguration(conf); 423 } 424 } 425 } 426 427 /** 428 * Applies the given configuration. 429 */ 430 private void updateConfiguration(Configuration conf) { 431 boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 432 if (fallbackEnabled != newFallbackEnabled) { 433 LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled, 434 newFallbackEnabled); 435 fallbackEnabled = newFallbackEnabled; 436 } 437 provider.onConfigurationChange(conf); 438 internalBalancer.onConfigurationChange(conf); 439 } 440 441 /** 442 * If a pending configuration was stored during a balance run, apply it and clear the pending 443 * reference. 444 */ 445 public void applyPendingConfiguration() { 446 Configuration toApply = pendingConfiguration.getAndSet(null); 447 if (toApply != null) { 448 LOG.info("Applying pending configuration after balance completed."); 449 updateConfiguration(toApply); 450 } 451 } 452 453 /** 454 * Sets {@link #isBalancing} to {@code true} before a balance run starts. 455 */ 456 @Override 457 public void onBalancingStart() { 458 LOG.debug("setting isBalancing to true as balance is starting"); 459 isBalancing.set(true); 460 } 461 462 /** 463 * Sets {@link #isBalancing} to {@code false} after a balance run completes and applies any 464 * pending configuration that arrived during balancing. 465 */ 466 @Override 467 public void onBalancingComplete() { 468 LOG.debug("setting isBalancing to false as balance is completed"); 469 isBalancing.set(false); 470 applyPendingConfiguration(); 471 } 472 473 @Override 474 public void throttle(RegionPlan plan) throws Exception { 475 long throttlingTime = internalBalancer.getThrottleDurationMs(plan); 476 if (throttlingTime > 0) { 477 try { 478 // Release the monitor while waiting to avoid blocking other threads. 479 wait(throttlingTime); 480 } catch (InterruptedException e) { 481 throw new RuntimeException(e); 482 } 483 } 484 } 485 486 @Override 487 public void stop(String why) { 488 internalBalancer.stop(why); 489 } 490 491 @Override 492 public boolean isStopped() { 493 return internalBalancer.isStopped(); 494 } 495 496 public LoadBalancer getInternalBalancer() { 497 return internalBalancer; 498 } 499 500 public FavoredNodesManager getFavoredNodesManager() { 501 return favoredNodesManager; 502 } 503 504 @Override 505 public synchronized void postMasterStartupInitialize() { 506 this.internalBalancer.postMasterStartupInitialize(); 507 } 508 509 public void updateBalancerStatus(boolean status) { 510 internalBalancer.updateBalancerStatus(status); 511 } 512 513 private List<ServerName> getFallBackCandidates(List<ServerName> servers) { 514 List<ServerName> serverNames = null; 515 try { 516 RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 517 serverNames = filterOfflineServers(info, servers); 518 } catch (IOException e) { 519 LOG.error("Failed to get default rsgroup info to fallback", e); 520 } 521 return serverNames == null || serverNames.isEmpty() ? servers : serverNames; 522 } 523 524 @Override 525 public void setClusterInfoProvider(ClusterInfoProvider provider) { 526 throw new UnsupportedOperationException("Just call set master service instead"); 527 } 528}