001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeMap; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.constraint.ConstraintException; 037import org.apache.hadoop.hbase.master.LoadBalancer; 038import org.apache.hadoop.hbase.master.MasterServices; 039import org.apache.hadoop.hbase.master.RegionPlan; 040import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 041import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 042import org.apache.hadoop.hbase.net.Address; 043import org.apache.hadoop.hbase.util.Pair; 044import org.apache.hadoop.util.ReflectionUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 055 056/** 057 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) 058 * It does region balance based on a table's group membership. 059 * 060 * Most assignment methods contain two exclusive code paths: Online - when the group 061 * table is online and Offline - when it is unavailable. 062 * 063 * During Offline, assignments are assigned based on cached information in zookeeper. 064 * If unavailable (ie bootstrap) then regions are assigned randomly. 065 * 066 * Once the GROUP table has been assigned, the balancer switches to Online and will then 067 * start providing appropriate assignments for user tables. 068 * 069 */ 070@InterfaceAudience.Private 071public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 072 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 073 074 private Configuration config; 075 private ClusterMetrics clusterStatus; 076 private MasterServices masterServices; 077 private volatile RSGroupInfoManager rsGroupInfoManager; 078 private LoadBalancer internalBalancer; 079 080 /** 081 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 082 */ 083 @InterfaceAudience.Private 084 public RSGroupBasedLoadBalancer() {} 085 086 @Override 087 public Configuration getConf() { 088 return config; 089 } 090 091 @Override 092 public void setConf(Configuration conf) { 093 this.config = conf; 094 if (internalBalancer != null) { 095 internalBalancer.setConf(conf); 096 } 097 } 098 099 @Override 100 public void setClusterMetrics(ClusterMetrics sm) { 101 this.clusterStatus = sm; 102 if (internalBalancer != null) { 103 internalBalancer.setClusterMetrics(sm); 104 } 105 } 106 107 @Override 108 public void setMasterServices(MasterServices masterServices) { 109 this.masterServices = masterServices; 110 } 111 112 /** 113 * Override to balance by RSGroup 114 * not invoke {@link #balanceTable(TableName, Map)} 115 */ 116 @Override 117 public List<RegionPlan> balanceCluster( 118 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException { 119 if (!isOnline()) { 120 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + 121 " is not online, unable to perform balance"); 122 } 123 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 124 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 125 correctedStateAndRegionPlans = correctAssignments(loadOfAllTable); 126 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable = 127 correctedStateAndRegionPlans.getFirst(); 128 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 129 // Add RegionPlan for the regions which have been placed according to the region server group 130 // assignment into the movement list 131 try { 132 // For each rsgroup 133 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 134 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>(); 135 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable 136 .entrySet()) { 137 TableName tableName = entry.getKey(); 138 String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 139 if (targetRSGroupName == null) { 140 targetRSGroupName = RSGroupInfo.DEFAULT_GROUP; 141 } 142 if (targetRSGroupName.equals(rsgroup.getName())) { 143 loadOfTablesInGroup.put(tableName, entry.getValue()); 144 } 145 } 146 List<RegionPlan> groupPlans = null; 147 if (!loadOfTablesInGroup.isEmpty()) { 148 LOG.info("Start Generate Balance plan for group: " + rsgroup.getName()); 149 groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup); 150 } 151 if (groupPlans != null) { 152 regionPlans.addAll(groupPlans); 153 } 154 } 155 } catch (IOException exp) { 156 LOG.warn("Exception while balancing cluster.", exp); 157 regionPlans.clear(); 158 } 159 return regionPlans; 160 } 161 162 @Override 163 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 164 List<ServerName> servers) throws HBaseIOException { 165 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 166 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 167 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 168 generateGroupMaps(regions, servers, regionMap, serverMap); 169 for (String groupKey : regionMap.keySet()) { 170 if (regionMap.get(groupKey).size() > 0) { 171 Map<ServerName, List<RegionInfo>> result = this.internalBalancer 172 .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey)); 173 if (result != null) { 174 if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) && 175 assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { 176 assignments.get(LoadBalancer.BOGUS_SERVER_NAME) 177 .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME)); 178 } else { 179 assignments.putAll(result); 180 } 181 } 182 } 183 } 184 return assignments; 185 } 186 187 @Override 188 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 189 List<ServerName> servers) throws HBaseIOException { 190 try { 191 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 192 ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create(); 193 for (RegionInfo region : regions.keySet()) { 194 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 195 if (groupName == null) { 196 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 197 groupName = RSGroupInfo.DEFAULT_GROUP; 198 } 199 groupToRegion.put(groupName, region); 200 } 201 for (String key : groupToRegion.keySet()) { 202 Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>(); 203 List<RegionInfo> regionList = groupToRegion.get(key); 204 RSGroupInfo info = rsGroupInfoManager.getRSGroup(key); 205 List<ServerName> candidateList = filterOfflineServers(info, servers); 206 for (RegionInfo region : regionList) { 207 currentAssignmentMap.put(region, regions.get(region)); 208 } 209 if (candidateList.size() > 0) { 210 assignments 211 .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList)); 212 } else { 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("No available servers to assign regions: {}", 215 RegionInfo.getShortNameToLog(regionList)); 216 } 217 assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>()) 218 .addAll(regionList); 219 } 220 } 221 return assignments; 222 } catch (IOException e) { 223 throw new HBaseIOException("Failed to do online retain assignment", e); 224 } 225 } 226 227 @Override 228 public ServerName randomAssignment(RegionInfo region, 229 List<ServerName> servers) throws HBaseIOException { 230 ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create(); 231 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); 232 generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); 233 List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next()); 234 return this.internalBalancer.randomAssignment(region, filteredServers); 235 } 236 237 private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers, 238 ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap) 239 throws HBaseIOException { 240 try { 241 for (RegionInfo region : regions) { 242 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 243 if (groupName == null) { 244 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 245 groupName = RSGroupInfo.DEFAULT_GROUP; 246 } 247 regionMap.put(groupName, region); 248 } 249 for (String groupKey : regionMap.keySet()) { 250 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 251 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 252 if(serverMap.get(groupKey).size() < 1) { 253 serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); 254 } 255 } 256 } catch(IOException e) { 257 throw new HBaseIOException("Failed to generate group maps", e); 258 } 259 } 260 261 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 262 List<ServerName> onlineServers) { 263 if (RSGroupInfo != null) { 264 return filterServers(RSGroupInfo.getServers(), onlineServers); 265 } else { 266 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 267 return Collections.emptyList(); 268 } 269 } 270 271 /** 272 * Filter servers based on the online servers. 273 * <p/> 274 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 275 * its contains()'s time complexity as O(logn), which is good enough. 276 * <p/> 277 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 278 * needed. 279 * @param servers the servers 280 * @param onlineServers List of servers which are online. 281 * @return the list 282 */ 283 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 284 ArrayList<ServerName> finalList = new ArrayList<>(); 285 for (ServerName onlineServer : onlineServers) { 286 if (servers.contains(onlineServer.getAddress())) { 287 finalList.add(onlineServer); 288 } 289 } 290 return finalList; 291 } 292 293 private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 294 correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments) 295 throws IOException { 296 // To return 297 Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>(); 298 List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); 299 for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments 300 .entrySet()) { 301 TableName tableName = assignments.getKey(); 302 Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue(); 303 Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>(); 304 RSGroupInfo targetRSGInfo = null; 305 try { 306 String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); 307 if (groupName == null) { 308 LOG.debug("Group not found for table " + tableName + ", using default"); 309 groupName = RSGroupInfo.DEFAULT_GROUP; 310 } 311 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 312 } catch (IOException exp) { 313 LOG.debug("RSGroup information null for region of table " + tableName, exp); 314 } 315 for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) { 316 ServerName currentHostServer = serverRegionMap.getKey(); 317 List<RegionInfo> regionInfoList = serverRegionMap.getValue(); 318 if (targetRSGInfo == null 319 || !targetRSGInfo.containsServer(currentHostServer.getAddress())) { 320 regionInfoList.forEach(regionInfo -> { 321 regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null)); 322 }); 323 } else { 324 correctServerRegion.put(currentHostServer, regionInfoList); 325 } 326 } 327 correctAssignments.put(tableName, correctServerRegion); 328 } 329 // Return correct assignments and region movement plan for mis-placed regions together 330 return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>( 331 correctAssignments, regionPlansForMisplacedRegions); 332 } 333 334 @Override 335 public void initialize() throws HBaseIOException { 336 try { 337 if (rsGroupInfoManager == null) { 338 List<RSGroupAdminEndpoint> cps = 339 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 340 if (cps.size() != 1) { 341 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 342 LOG.error(msg); 343 throw new HBaseIOException(msg); 344 } 345 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 346 if(rsGroupInfoManager == null){ 347 String msg = "RSGroupInfoManager hasn't been initialized"; 348 LOG.error(msg); 349 throw new HBaseIOException(msg); 350 } 351 rsGroupInfoManager.start(); 352 } 353 } catch (IOException e) { 354 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 355 } 356 357 // Create the balancer 358 Class<? extends LoadBalancer> balancerClass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 359 StochasticLoadBalancer.class, LoadBalancer.class); 360 if (this.getClass().isAssignableFrom(balancerClass)) { 361 LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " + 362 "falling back to the default LoadBalancer class"); 363 balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass(); 364 } 365 internalBalancer = ReflectionUtils.newInstance(balancerClass, config); 366 internalBalancer.setMasterServices(masterServices); 367 if (clusterStatus != null) { 368 internalBalancer.setClusterMetrics(clusterStatus); 369 } 370 internalBalancer.setConf(config); 371 internalBalancer.initialize(); 372 } 373 374 public boolean isOnline() { 375 if (this.rsGroupInfoManager == null) { 376 return false; 377 } 378 379 return this.rsGroupInfoManager.isOnline(); 380 } 381 382 383 @Override 384 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 385 } 386 387 @Override 388 public void regionOffline(RegionInfo regionInfo) { 389 } 390 391 @Override 392 public void onConfigurationChange(Configuration conf) { 393 //DO nothing for now 394 } 395 396 @Override 397 public void stop(String why) { 398 } 399 400 @Override 401 public boolean isStopped() { 402 return false; 403 } 404 405 @VisibleForTesting 406 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 407 this.rsGroupInfoManager = rsGroupInfoManager; 408 } 409 410 @Override 411 public void postMasterStartupInitialize() { 412 this.internalBalancer.postMasterStartupInitialize(); 413 } 414 415 public void updateBalancerStatus(boolean status) { 416 internalBalancer.updateBalancerStatus(status); 417 } 418 419 /** 420 * can achieve table balanced rather than overall balanced 421 */ 422 @Override 423 public List<RegionPlan> balanceTable(TableName tableName, 424 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 425 if (!isOnline()) { 426 LOG.error(RSGroupInfoManager.class.getSimpleName() 427 + " is not online, unable to perform balanceTable"); 428 return null; 429 } 430 Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>(); 431 loadOfThisTable.put(tableName, loadOfOneTable); 432 Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>> 433 correctedStateAndRegionPlans; 434 // Calculate correct assignments and a list of RegionPlan for mis-placed regions 435 try { 436 correctedStateAndRegionPlans = correctAssignments(loadOfThisTable); 437 } catch (IOException e) { 438 LOG.error("get correct assignments and mis-placed regions error ", e); 439 return null; 440 } 441 Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable = 442 correctedStateAndRegionPlans.getFirst(); 443 List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond(); 444 List<RegionPlan> tablePlans = 445 this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName)); 446 447 if (tablePlans != null) { 448 regionPlans.addAll(tablePlans); 449 } 450 return regionPlans; 451 } 452}