001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.HBaseIOException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.constraint.ConstraintException; 040import org.apache.hadoop.hbase.master.LoadBalancer; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 044import org.apache.hadoop.hbase.net.Address; 045import org.apache.hadoop.util.ReflectionUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 051import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap; 053import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 056 057/** 058 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) 059 * It does region balance based on a table's group membership. 060 * 061 * Most assignment methods contain two exclusive code paths: Online - when the group 062 * table is online and Offline - when it is unavailable. 063 * 064 * During Offline, assignments are assigned based on cached information in zookeeper. 065 * If unavailable (ie bootstrap) then regions are assigned randomly. 066 * 067 * Once the GROUP table has been assigned, the balancer switches to Online and will then 068 * start providing appropriate assignments for user tables. 069 * 070 */ 071@InterfaceAudience.Private 072public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 073 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 074 075 private Configuration config; 076 private ClusterMetrics clusterStatus; 077 private MasterServices masterServices; 078 private volatile RSGroupInfoManager rsGroupInfoManager; 079 private LoadBalancer internalBalancer; 080 081 /** 082 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 083 */ 084 @InterfaceAudience.Private 085 public RSGroupBasedLoadBalancer() {} 086 087 @Override 088 public Configuration getConf() { 089 return config; 090 } 091 092 @Override 093 public void setConf(Configuration conf) { 094 this.config = conf; 095 if (internalBalancer != null) { 096 internalBalancer.setConf(conf); 097 } 098 } 099 100 @Override 101 public void setClusterMetrics(ClusterMetrics sm) { 102 this.clusterStatus = sm; 103 if (internalBalancer != null) { 104 internalBalancer.setClusterMetrics(sm); 105 } 106 } 107 108 @Override 109 public void setMasterServices(MasterServices masterServices) { 110 this.masterServices = masterServices; 111 } 112 113 @Override 114 public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>> 115 clusterState) throws HBaseIOException { 116 return balanceCluster(clusterState); 117 } 118 119 @Override 120 public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) 121 throws HBaseIOException { 122 if (!isOnline()) { 123 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + 124 " is not online, unable to perform balance"); 125 } 126 127 Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState); 128 List<RegionPlan> regionPlans = new ArrayList<>(); 129 130 List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME); 131 for (RegionInfo regionInfo : misplacedRegions) { 132 ServerName serverName = findServerForRegion(clusterState, regionInfo); 133 regionPlans.add(new RegionPlan(regionInfo, serverName, null)); 134 } 135 try { 136 // Record which region servers have been processed,so as to skip them after processed 137 HashSet<ServerName> processedServers = new HashSet<>(); 138 139 // For each rsgroup 140 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 141 Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>(); 142 Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>(); 143 for (ServerName server : clusterState.keySet()) { // for each region server 144 if (!processedServers.contains(server) // server is not processed yet 145 && rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup 146 List<RegionInfo> regionsOnServer = correctedState.get(server); 147 groupClusterState.put(server, regionsOnServer); 148 149 processedServers.add(server); 150 } 151 } 152 153 groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState); 154 this.internalBalancer.setClusterLoad(groupClusterLoad); 155 List<RegionPlan> groupPlans = this.internalBalancer 156 .balanceCluster(groupClusterState); 157 if (groupPlans != null) { 158 regionPlans.addAll(groupPlans); 159 } 160 } 161 } catch (IOException exp) { 162 LOG.warn("Exception while balancing cluster.", exp); 163 regionPlans.clear(); 164 } 165 return regionPlans; 166 } 167 168 @Override 169 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 170 List<ServerName> servers) throws HBaseIOException { 171 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 172 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 173 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 174 generateGroupMaps(regions, servers, regionMap, serverMap); 175 for (String groupKey : regionMap.keySet()) { 176 if (regionMap.get(groupKey).size() > 0) { 177 Map<ServerName, List<RegionInfo>> result = this.internalBalancer 178 .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey)); 179 if (result != null) { 180 if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) && 181 assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { 182 assignments.get(LoadBalancer.BOGUS_SERVER_NAME) 183 .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME)); 184 } else { 185 assignments.putAll(result); 186 } 187 } 188 } 189 } 190 return assignments; 191 } 192 193 @Override 194 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 195 List<ServerName> servers) throws HBaseIOException { 196 try { 197 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 198 ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create(); 199 for (RegionInfo region : regions.keySet()) { 200 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 201 if (groupName == null) { 202 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 203 groupName = RSGroupInfo.DEFAULT_GROUP; 204 } 205 groupToRegion.put(groupName, region); 206 } 207 for (String key : groupToRegion.keySet()) { 208 Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>(); 209 List<RegionInfo> regionList = groupToRegion.get(key); 210 RSGroupInfo info = rsGroupInfoManager.getRSGroup(key); 211 List<ServerName> candidateList = filterOfflineServers(info, servers); 212 for (RegionInfo region : regionList) { 213 currentAssignmentMap.put(region, regions.get(region)); 214 } 215 if (candidateList.size() > 0) { 216 assignments 217 .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList)); 218 } else { 219 if (LOG.isDebugEnabled()) { 220 LOG.debug("No available servers to assign regions: {}", 221 RegionInfo.getShortNameToLog(regionList)); 222 } 223 assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>()) 224 .addAll(regionList); 225 } 226 } 227 return assignments; 228 } catch (IOException e) { 229 throw new HBaseIOException("Failed to do online retain assignment", e); 230 } 231 } 232 233 @Override 234 public ServerName randomAssignment(RegionInfo region, 235 List<ServerName> servers) throws HBaseIOException { 236 ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create(); 237 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); 238 generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); 239 List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next()); 240 return this.internalBalancer.randomAssignment(region, filteredServers); 241 } 242 243 private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers, 244 ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap) 245 throws HBaseIOException { 246 try { 247 for (RegionInfo region : regions) { 248 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 249 if (groupName == null) { 250 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 251 groupName = RSGroupInfo.DEFAULT_GROUP; 252 } 253 regionMap.put(groupName, region); 254 } 255 for (String groupKey : regionMap.keySet()) { 256 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 257 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 258 if(serverMap.get(groupKey).size() < 1) { 259 serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); 260 } 261 } 262 } catch(IOException e) { 263 throw new HBaseIOException("Failed to generate group maps", e); 264 } 265 } 266 267 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 268 List<ServerName> onlineServers) { 269 if (RSGroupInfo != null) { 270 return filterServers(RSGroupInfo.getServers(), onlineServers); 271 } else { 272 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 273 return Collections.emptyList(); 274 } 275 } 276 277 /** 278 * Filter servers based on the online servers. 279 * <p/> 280 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 281 * its contains()'s time complexity as O(logn), which is good enough. 282 * <p/> 283 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 284 * needed. 285 * @param servers the servers 286 * @param onlineServers List of servers which are online. 287 * @return the list 288 */ 289 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 290 ArrayList<ServerName> finalList = new ArrayList<>(); 291 for (ServerName onlineServer : onlineServers) { 292 if (servers.contains(onlineServer.getAddress())) { 293 finalList.add(onlineServer); 294 } 295 } 296 return finalList; 297 } 298 299 private ServerName findServerForRegion( 300 Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) { 301 for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) { 302 if (entry.getValue().contains(region)) { 303 return entry.getKey(); 304 } 305 } 306 307 throw new IllegalStateException("Could not find server for region " 308 + region.getShortNameToLog()); 309 } 310 311 private Map<ServerName, List<RegionInfo>> correctAssignments( 312 Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{ 313 Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>(); 314 correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>()); 315 for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){ 316 ServerName sName = assignments.getKey(); 317 correctAssignments.put(sName, new LinkedList<>()); 318 List<RegionInfo> regions = assignments.getValue(); 319 for (RegionInfo region : regions) { 320 RSGroupInfo targetRSGInfo = null; 321 try { 322 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 323 if (groupName == null) { 324 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 325 groupName = RSGroupInfo.DEFAULT_GROUP; 326 } 327 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 328 } catch (IOException exp) { 329 LOG.debug("RSGroup information null for region of table " + region.getTable(), 330 exp); 331 } 332 if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) { 333 correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); 334 } else { 335 correctAssignments.get(sName).add(region); 336 } 337 } 338 } 339 return correctAssignments; 340 } 341 342 @Override 343 public void initialize() throws HBaseIOException { 344 try { 345 if (rsGroupInfoManager == null) { 346 List<RSGroupAdminEndpoint> cps = 347 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 348 if (cps.size() != 1) { 349 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 350 LOG.error(msg); 351 throw new HBaseIOException(msg); 352 } 353 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 354 if(rsGroupInfoManager == null){ 355 String msg = "RSGroupInfoManager hasn't been initialized"; 356 LOG.error(msg); 357 throw new HBaseIOException(msg); 358 } 359 rsGroupInfoManager.start(); 360 } 361 } catch (IOException e) { 362 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 363 } 364 365 // Create the balancer 366 Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 367 StochasticLoadBalancer.class, LoadBalancer.class); 368 internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); 369 internalBalancer.setMasterServices(masterServices); 370 if (clusterStatus != null) { 371 internalBalancer.setClusterMetrics(clusterStatus); 372 } 373 internalBalancer.setConf(config); 374 internalBalancer.initialize(); 375 } 376 377 public boolean isOnline() { 378 if (this.rsGroupInfoManager == null) { 379 return false; 380 } 381 382 return this.rsGroupInfoManager.isOnline(); 383 } 384 385 @Override 386 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 387 } 388 389 @Override 390 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 391 } 392 393 @Override 394 public void regionOffline(RegionInfo regionInfo) { 395 } 396 397 @Override 398 public void onConfigurationChange(Configuration conf) { 399 //DO nothing for now 400 } 401 402 @Override 403 public void stop(String why) { 404 } 405 406 @Override 407 public boolean isStopped() { 408 return false; 409 } 410 411 @VisibleForTesting 412 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 413 this.rsGroupInfoManager = rsGroupInfoManager; 414 } 415 416 @Override 417 public void postMasterStartupInitialize() { 418 this.internalBalancer.postMasterStartupInitialize(); 419 } 420 421 public void updateBalancerStatus(boolean status) { 422 internalBalancer.updateBalancerStatus(status); 423 } 424}