001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseIOException; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.constraint.ConstraintException; 039import org.apache.hadoop.hbase.master.LoadBalancer; 040import org.apache.hadoop.hbase.master.MasterServices; 041import org.apache.hadoop.hbase.master.RegionPlan; 042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 043import org.apache.hadoop.hbase.net.Address; 044import org.apache.hadoop.util.ReflectionUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 055 056/** 057 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) 058 * It does region balance based on a table's group membership. 059 * 060 * Most assignment methods contain two exclusive code paths: Online - when the group 061 * table is online and Offline - when it is unavailable. 062 * 063 * During Offline, assignments are assigned based on cached information in zookeeper. 064 * If unavailable (ie bootstrap) then regions are assigned randomly. 065 * 066 * Once the GROUP table has been assigned, the balancer switches to Online and will then 067 * start providing appropriate assignments for user tables. 068 * 069 */ 070@InterfaceAudience.Private 071public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 072 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 073 074 private Configuration config; 075 private ClusterMetrics clusterStatus; 076 private MasterServices masterServices; 077 private volatile RSGroupInfoManager rsGroupInfoManager; 078 private LoadBalancer internalBalancer; 079 080 /** 081 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 082 */ 083 @InterfaceAudience.Private 084 public RSGroupBasedLoadBalancer() {} 085 086 @Override 087 public Configuration getConf() { 088 return config; 089 } 090 091 @Override 092 public void setConf(Configuration conf) { 093 this.config = conf; 094 } 095 096 @Override 097 public void setClusterMetrics(ClusterMetrics sm) { 098 this.clusterStatus = sm; 099 } 100 101 @Override 102 public void setMasterServices(MasterServices masterServices) { 103 this.masterServices = masterServices; 104 } 105 106 @Override 107 public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>> 108 clusterState) throws HBaseIOException { 109 return balanceCluster(clusterState); 110 } 111 112 @Override 113 public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) 114 throws HBaseIOException { 115 if (!isOnline()) { 116 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + 117 " is not online, unable to perform balance"); 118 } 119 120 Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState); 121 List<RegionPlan> regionPlans = new ArrayList<>(); 122 123 List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME); 124 for (RegionInfo regionInfo : misplacedRegions) { 125 ServerName serverName = findServerForRegion(clusterState, regionInfo); 126 regionPlans.add(new RegionPlan(regionInfo, serverName, null)); 127 } 128 try { 129 // Record which region servers have been processed,so as to skip them after processed 130 HashSet<ServerName> processedServers = new HashSet<>(); 131 132 // For each rsgroup 133 for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) { 134 Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>(); 135 Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>(); 136 for (ServerName server : clusterState.keySet()) { // for each region server 137 if (!processedServers.contains(server) // server is not processed yet 138 && rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup 139 List<RegionInfo> regionsOnServer = correctedState.get(server); 140 groupClusterState.put(server, regionsOnServer); 141 142 processedServers.add(server); 143 } 144 } 145 146 groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState); 147 this.internalBalancer.setClusterLoad(groupClusterLoad); 148 List<RegionPlan> groupPlans = this.internalBalancer 149 .balanceCluster(groupClusterState); 150 if (groupPlans != null) { 151 regionPlans.addAll(groupPlans); 152 } 153 } 154 } catch (IOException exp) { 155 LOG.warn("Exception while balancing cluster.", exp); 156 regionPlans.clear(); 157 } 158 return regionPlans; 159 } 160 161 @Override 162 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 163 List<ServerName> servers) throws HBaseIOException { 164 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 165 ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); 166 ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); 167 generateGroupMaps(regions, servers, regionMap, serverMap); 168 for (String groupKey : regionMap.keySet()) { 169 if (regionMap.get(groupKey).size() > 0) { 170 Map<ServerName, List<RegionInfo>> result = this.internalBalancer 171 .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey)); 172 if (result != null) { 173 if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) && 174 assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { 175 assignments.get(LoadBalancer.BOGUS_SERVER_NAME) 176 .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME)); 177 } else { 178 assignments.putAll(result); 179 } 180 } 181 } 182 } 183 return assignments; 184 } 185 186 @Override 187 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 188 List<ServerName> servers) throws HBaseIOException { 189 try { 190 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 191 ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create(); 192 for (RegionInfo region : regions.keySet()) { 193 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 194 if (groupName == null) { 195 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 196 groupName = RSGroupInfo.DEFAULT_GROUP; 197 } 198 groupToRegion.put(groupName, region); 199 } 200 for (String key : groupToRegion.keySet()) { 201 Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>(); 202 List<RegionInfo> regionList = groupToRegion.get(key); 203 RSGroupInfo info = rsGroupInfoManager.getRSGroup(key); 204 List<ServerName> candidateList = filterOfflineServers(info, servers); 205 for (RegionInfo region : regionList) { 206 currentAssignmentMap.put(region, regions.get(region)); 207 } 208 if (candidateList.size() > 0) { 209 assignments 210 .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList)); 211 } else { 212 if (LOG.isDebugEnabled()) { 213 LOG.debug("No available servers to assign regions: {}", 214 RegionInfo.getShortNameToLog(regionList)); 215 } 216 assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>()) 217 .addAll(regionList); 218 } 219 } 220 return assignments; 221 } catch (IOException e) { 222 throw new HBaseIOException("Failed to do online retain assignment", e); 223 } 224 } 225 226 @Override 227 public ServerName randomAssignment(RegionInfo region, 228 List<ServerName> servers) throws HBaseIOException { 229 ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create(); 230 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); 231 generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); 232 List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next()); 233 return this.internalBalancer.randomAssignment(region, filteredServers); 234 } 235 236 private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers, 237 ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap) 238 throws HBaseIOException { 239 try { 240 for (RegionInfo region : regions) { 241 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 242 if (groupName == null) { 243 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 244 groupName = RSGroupInfo.DEFAULT_GROUP; 245 } 246 regionMap.put(groupName, region); 247 } 248 for (String groupKey : regionMap.keySet()) { 249 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 250 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 251 if(serverMap.get(groupKey).size() < 1) { 252 serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); 253 } 254 } 255 } catch(IOException e) { 256 throw new HBaseIOException("Failed to generate group maps", e); 257 } 258 } 259 260 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 261 List<ServerName> onlineServers) { 262 if (RSGroupInfo != null) { 263 return filterServers(RSGroupInfo.getServers(), onlineServers); 264 } else { 265 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 266 return Collections.emptyList(); 267 } 268 } 269 270 /** 271 * Filter servers based on the online servers. 272 * <p/> 273 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having 274 * its contains()'s time complexity as O(logn), which is good enough. 275 * <p/> 276 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if 277 * needed. 278 * @param servers the servers 279 * @param onlineServers List of servers which are online. 280 * @return the list 281 */ 282 private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) { 283 ArrayList<ServerName> finalList = new ArrayList<>(); 284 for (ServerName onlineServer : onlineServers) { 285 if (servers.contains(onlineServer.getAddress())) { 286 finalList.add(onlineServer); 287 } 288 } 289 return finalList; 290 } 291 292 private ServerName findServerForRegion( 293 Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) { 294 for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) { 295 if (entry.getValue().contains(region)) { 296 return entry.getKey(); 297 } 298 } 299 300 throw new IllegalStateException("Could not find server for region " 301 + region.getShortNameToLog()); 302 } 303 304 private Map<ServerName, List<RegionInfo>> correctAssignments( 305 Map<ServerName, List<RegionInfo>> existingAssignments) 306 throws HBaseIOException{ 307 Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>(); 308 correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>()); 309 for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){ 310 ServerName sName = assignments.getKey(); 311 correctAssignments.put(sName, new LinkedList<>()); 312 List<RegionInfo> regions = assignments.getValue(); 313 for (RegionInfo region : regions) { 314 RSGroupInfo targetRSGInfo = null; 315 try { 316 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 317 if (groupName == null) { 318 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 319 groupName = RSGroupInfo.DEFAULT_GROUP; 320 } 321 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 322 } catch (IOException exp) { 323 LOG.debug("RSGroup information null for region of table " + region.getTable(), 324 exp); 325 } 326 if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) { 327 correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); 328 } else { 329 correctAssignments.get(sName).add(region); 330 } 331 } 332 } 333 return correctAssignments; 334 } 335 336 @Override 337 public void initialize() throws HBaseIOException { 338 try { 339 if (rsGroupInfoManager == null) { 340 List<RSGroupAdminEndpoint> cps = 341 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 342 if (cps.size() != 1) { 343 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 344 LOG.error(msg); 345 throw new HBaseIOException(msg); 346 } 347 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 348 if(rsGroupInfoManager == null){ 349 String msg = "RSGroupInfoManager hasn't been initialized"; 350 LOG.error(msg); 351 throw new HBaseIOException(msg); 352 } 353 rsGroupInfoManager.start(); 354 } 355 } catch (IOException e) { 356 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 357 } 358 359 // Create the balancer 360 Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 361 StochasticLoadBalancer.class, LoadBalancer.class); 362 internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); 363 internalBalancer.setMasterServices(masterServices); 364 internalBalancer.setClusterMetrics(clusterStatus); 365 internalBalancer.setConf(config); 366 internalBalancer.initialize(); 367 } 368 369 public boolean isOnline() { 370 if (this.rsGroupInfoManager == null) { 371 return false; 372 } 373 374 return this.rsGroupInfoManager.isOnline(); 375 } 376 377 @Override 378 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 379 } 380 381 @Override 382 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 383 } 384 385 @Override 386 public void regionOffline(RegionInfo regionInfo) { 387 } 388 389 @Override 390 public void onConfigurationChange(Configuration conf) { 391 //DO nothing for now 392 } 393 394 @Override 395 public void stop(String why) { 396 } 397 398 @Override 399 public boolean isStopped() { 400 return false; 401 } 402 403 @VisibleForTesting 404 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 405 this.rsGroupInfoManager = rsGroupInfoManager; 406 } 407 408 @Override 409 public void postMasterStartupInitialize() { 410 this.internalBalancer.postMasterStartupInitialize(); 411 } 412 413 public void updateBalancerStatus(boolean status) { 414 internalBalancer.updateBalancerStatus(status); 415 } 416}