001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import edu.umd.cs.findbugs.annotations.NonNull; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.TreeMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseIOException; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.constraint.ConstraintException; 038import org.apache.hadoop.hbase.master.LoadBalancer; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.RegionPlan; 041import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 043import org.apache.hadoop.hbase.net.Address; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.hbase.util.ReflectionUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 054 055/** 056 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does 057 * region balance based on a table's group membership. Most assignment methods contain two exclusive 058 * code paths: Online - when the group table is online and Offline - when it is unavailable. During 059 * Offline, assignments are assigned based on cached information in zookeeper. If unavailable (ie 060 * bootstrap) then regions are assigned randomly. Once the GROUP table has been assigned, the 061 * balancer switches to Online and will then start providing appropriate assignments for user 062 * tables. 063 */ 064@InterfaceAudience.Private 065public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 066 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 067 068 private MasterServices masterServices; 069 private volatile RSGroupInfoManager rsGroupInfoManager; 070 private volatile LoadBalancer internalBalancer; 071 072 /** 073 * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first, 074 * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch 075 * on at the same time, which is relied on to correct misplaced regions 076 */ 077 public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable"; 078 079 private volatile boolean fallbackEnabled = false; 080 081 /** 082 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 083 */ 084 @InterfaceAudience.Private 085 public RSGroupBasedLoadBalancer() { 086 } 087 088 // must be called after calling initialize 089 @Override 090 public synchronized void updateClusterMetrics(ClusterMetrics sm) { 091 assert internalBalancer != null; 092 internalBalancer.updateClusterMetrics(sm); 093 } 094 095 @Override 096 public synchronized void 097 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 098 internalBalancer.updateBalancerLoadInfo(loadOfAllTable); 099 } 100 101 public void setMasterServices(MasterServices masterServices) { 102 this.masterServices = masterServices; 103 } 104 105 @RestrictedApi(explanation = "Should only be called in tests", link = "", 106 allowedOnPath = ".*/src/test/.*") 107 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 108 this.rsGroupInfoManager = rsGroupInfoManager; 109 } 110 111 /** 112 * Balance by RSGroup. 113 */ 114 @Override 115 public synchronized List<RegionPlan> balanceCluster( 116 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException { 117 if (!isOnline()) { 118 throw new ConstraintException( 119 RSGroupInfoManager.RSGROUP_TABLE_NAME + " is not online, unable to perform balance"); 120 } 121 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 122 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, 123 List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable); 124 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable = 125 correctedStateAndRegionPlans.getFirst(); 126 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 127 // Add RegionPlan for the regions which have been placed according to the region server group 128 // assignment into the movement list 129 try { 130 // For each rsgroup 131 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 132 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>(); 133 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable 134 .entrySet()) { 135 TableName tableName = entry.getKey(); 136 String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 137 if (targetRSGroupName == null) { 138 targetRSGroupName = RSGroupInfo.DEFAULT_GROUP; 139 } 140 if (targetRSGroupName.equals(rsgroup.getName())) { 141 loadOfTablesInGroup.put(tableName, entry.getValue()); 142 } 143 } 144 List<RegionPlan> groupPlans = null; 145 if (!loadOfTablesInGroup.isEmpty()) { 146 LOG.info("Start Generate Balance plan for group: " + rsgroup.getName()); 147 groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup); 148 } 149 if (groupPlans != null) { 150 regionPlans.addAll(groupPlans); 151 } 152 } 153 } catch (IOException exp) { 154 LOG.warn("Exception while balancing cluster.", exp); 155 regionPlans.clear(); 156 } 157 return regionPlans; 158 } 159 160 @Override 161 @NonNull 162 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 163 List<ServerName> servers) throws HBaseIOException { 164 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 165 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 166 generateGroupAssignments(regions, servers); 167 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 168 Map<ServerName, List<RegionInfo>> result = 169 this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond()); 170 result.forEach((server, regionInfos) -> assignments 171 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos)); 172 } 173 return assignments; 174 } 175 176 @Override 177 @NonNull 178 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 179 List<ServerName> servers) throws HBaseIOException { 180 try { 181 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 182 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 183 generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers); 184 for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) { 185 List<RegionInfo> regionList = pair.getFirst(); 186 Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap(); 187 regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r))); 188 Map<ServerName, List<RegionInfo>> pairResult = 189 this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond()); 190 pairResult.forEach((server, rs) -> assignments 191 .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs)); 192 } 193 return assignments; 194 } catch (IOException e) { 195 throw new HBaseIOException("Failed to do online retain assignment", e); 196 } 197 } 198 199 @Override 200 public ServerName randomAssignment(RegionInfo region, List<ServerName> servers) 201 throws HBaseIOException { 202 List<Pair<List<RegionInfo>, List<ServerName>>> pairs = 203 generateGroupAssignments(Lists.newArrayList(region), servers); 204 List<ServerName> filteredServers = pairs.iterator().next().getSecond(); 205 return this.internalBalancer.randomAssignment(region, filteredServers); 206 } 207 208 private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments( 209 List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException { 210 try { 211 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 212 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 213 RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 214 for (RegionInfo region : regions) { 215 String groupName = 216 Optional.ofNullable(rsGroupInfoManager.getRSGroupOfTable(region.getTable())) 217 .orElse(defaultInfo.getName()); 218 regionMap.put(groupName, region); 219 } 220 for (String groupKey : regionMap.keySet()) { 221 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 222 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 223 } 224 225 List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList(); 226 List<RegionInfo> fallbackRegions = Lists.newArrayList(); 227 for (String groupKey : regionMap.keySet()) { 228 if (serverMap.get(groupKey).isEmpty()) { 229 fallbackRegions.addAll(regionMap.get(groupKey)); 230 } else { 231 result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey))); 232 } 233 } 234 if (!fallbackRegions.isEmpty()) { 235 List<ServerName> candidates = null; 236 if (isFallbackEnabled()) { 237 candidates = getFallBackCandidates(servers); 238 } 239 candidates = (candidates == null || candidates.isEmpty()) 240 ? Lists.newArrayList(BOGUS_SERVER_NAME) 241 : candidates; 242 result.add(Pair.newPair(fallbackRegions, candidates)); 243 } 244 return result; 245 } catch (IOException e) { 246 throw new HBaseIOException("Failed to generate group assignments", e); 247 } 248 } 249 250 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 251 List<ServerName> onlineServers) { 252 if (RSGroupInfo != null) { 253 return filterServers(RSGroupInfo.getServers(), onlineServers); 254 } else { 255 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 256 return Collections.emptyList(); 257 } 258 } 259 260 /** 261 * Filter servers based on the online servers. 262 * <p/> 263 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 264 * its contains()'s time complexity as O(logn), which is good enough. 265 * <p/> 266 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 267 * needed. 268 * @param servers the servers 269 * @param onlineServers List of servers which are online. 270 * @return the list 271 */ 272 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 273 ArrayList<ServerName> finalList = new ArrayList<>(); 274 for (ServerName onlineServer : onlineServers) { 275 if (servers.contains(onlineServer.getAddress())) { 276 finalList.add(onlineServer); 277 } 278 } 279 return finalList; 280 } 281 282 private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 283 correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments) 284 throws IOException { 285 // To return 286 Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>(); 287 List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); 288 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments 289 .entrySet()) { 290 TableName tableName = assignments.getKey(); 291 Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue(); 292 Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>(); 293 RSGroupInfo targetRSGInfo = null; 294 try { 295 String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 296 if (groupName == null) { 297 LOG.debug("Group not found for table " + tableName + ", using default"); 298 groupName = RSGroupInfo.DEFAULT_GROUP; 299 } 300 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 301 } catch (IOException exp) { 302 LOG.debug("RSGroup information null for region of table " + tableName, exp); 303 } 304 for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) { 305 ServerName currentHostServer = serverRegionMap.getKey(); 306 List<RegionInfo> regionInfoList = serverRegionMap.getValue(); 307 if ( 308 targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress()) 309 ) { 310 regionInfoList.forEach(regionInfo -> { 311 regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null)); 312 }); 313 } else { 314 correctServerRegion.put(currentHostServer, regionInfoList); 315 } 316 } 317 correctAssignments.put(tableName, correctServerRegion); 318 } 319 // Return correct assignments and region movement plan for mis-placed regions together 320 return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>( 321 correctAssignments, regionPlansForMisplacedRegions); 322 } 323 324 @Override 325 public void initialize() throws HBaseIOException { 326 try { 327 if (rsGroupInfoManager == null) { 328 List<RSGroupAdminEndpoint> cps = 329 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 330 if (cps.size() != 1) { 331 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 332 LOG.error(msg); 333 throw new HBaseIOException(msg); 334 } 335 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 336 if (rsGroupInfoManager == null) { 337 String msg = "RSGroupInfoManager hasn't been initialized"; 338 LOG.error(msg); 339 throw new HBaseIOException(msg); 340 } 341 rsGroupInfoManager.start(); 342 } 343 } catch (IOException e) { 344 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 345 } 346 347 Configuration conf = masterServices.getConfiguration(); 348 // Create the balancer 349 Class<? extends LoadBalancer> balancerClass = conf.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 350 StochasticLoadBalancer.class, LoadBalancer.class); 351 if (this.getClass().isAssignableFrom(balancerClass)) { 352 LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " 353 + "falling back to the default LoadBalancer class"); 354 balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass(); 355 } 356 internalBalancer = ReflectionUtils.newInstance(balancerClass); 357 internalBalancer.setMasterServices(masterServices); 358 internalBalancer.initialize(); 359 // init fallback groups 360 this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 361 } 362 363 public boolean isOnline() { 364 if (this.rsGroupInfoManager == null) { 365 return false; 366 } 367 368 return this.rsGroupInfoManager.isOnline(); 369 } 370 371 public boolean isFallbackEnabled() { 372 return fallbackEnabled; 373 } 374 375 @Override 376 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 377 } 378 379 @Override 380 public void regionOffline(RegionInfo regionInfo) { 381 } 382 383 @Override 384 public synchronized void onConfigurationChange(Configuration conf) { 385 boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false); 386 if (fallbackEnabled != newFallbackEnabled) { 387 LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled, 388 newFallbackEnabled); 389 fallbackEnabled = newFallbackEnabled; 390 } 391 internalBalancer.onConfigurationChange(conf); 392 } 393 394 @Override 395 public void stop(String why) { 396 internalBalancer.stop(why); 397 } 398 399 @Override 400 public boolean isStopped() { 401 return internalBalancer.isStopped(); 402 } 403 404 @Override 405 public synchronized void postMasterStartupInitialize() { 406 this.internalBalancer.postMasterStartupInitialize(); 407 } 408 409 public void updateBalancerStatus(boolean status) { 410 internalBalancer.updateBalancerStatus(status); 411 } 412 413 private List<ServerName> getFallBackCandidates(List<ServerName> servers) { 414 List<ServerName> serverNames = null; 415 try { 416 RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); 417 serverNames = filterOfflineServers(info, servers); 418 } catch (IOException e) { 419 LOG.error("Failed to get default rsgroup info to fallback", e); 420 } 421 return serverNames == null || serverNames.isEmpty() ? servers : serverNames; 422 } 423}