001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.HBaseIOException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.constraint.ConstraintException; 040import org.apache.hadoop.hbase.master.LoadBalancer; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; 044import org.apache.hadoop.hbase.net.Address; 045import org.apache.hadoop.util.ReflectionUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 051import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 052import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap; 053import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 056 057/** 058 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) 059 * It does region balance based on a table's group membership. 060 * 061 * Most assignment methods contain two exclusive code paths: Online - when the group 062 * table is online and Offline - when it is unavailable. 063 * 064 * During Offline, assignments are assigned based on cached information in zookeeper. 065 * If unavailable (ie bootstrap) then regions are assigned randomly. 066 * 067 * Once the GROUP table has been assigned, the balancer switches to Online and will then 068 * start providing appropriate assignments for user tables. 069 * 070 */ 071@InterfaceAudience.Private 072public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { 073 private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class); 074 075 private Configuration config; 076 private ClusterMetrics clusterStatus; 077 private MasterServices masterServices; 078 private volatile RSGroupInfoManager rsGroupInfoManager; 079 private LoadBalancer internalBalancer; 080 081 /** 082 * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. 083 */ 084 @InterfaceAudience.Private 085 public RSGroupBasedLoadBalancer() {} 086 087 @Override 088 public Configuration getConf() { 089 return config; 090 } 091 092 @Override 093 public void setConf(Configuration conf) { 094 this.config = conf; 095 } 096 097 @Override 098 public void setClusterMetrics(ClusterMetrics sm) { 099 this.clusterStatus = sm; 100 } 101 102 @Override 103 public void setMasterServices(MasterServices masterServices) { 104 this.masterServices = masterServices; 105 } 106 107 @Override 108 public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>> 109 clusterState) throws HBaseIOException { 110 return balanceCluster(clusterState); 111 } 112 113 @Override 114 public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) 115 throws HBaseIOException { 116 if (!isOnline()) { 117 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + 118 " is not online, unable to perform balance"); 119 } 120 121 Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState); 122 List<RegionPlan> regionPlans = new ArrayList<>(); 123 124 List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME); 125 for (RegionInfo regionInfo : misplacedRegions) { 126 ServerName serverName = findServerForRegion(clusterState, regionInfo); 127 regionPlans.add(new RegionPlan(regionInfo, serverName, null)); 128 } 129 try { 130 List<RSGroupInfo> rsgi = rsGroupInfoManager.listRSGroups(); 131 for (RSGroupInfo info: rsgi) { 132 Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>(); 133 Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>(); 134 for (Address sName : info.getServers()) { 135 for(ServerName curr: clusterState.keySet()) { 136 if(curr.getAddress().equals(sName)) { 137 groupClusterState.put(curr, correctedState.get(curr)); 138 } 139 } 140 } 141 groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState); 142 this.internalBalancer.setClusterLoad(groupClusterLoad); 143 List<RegionPlan> groupPlans = this.internalBalancer 144 .balanceCluster(groupClusterState); 145 if (groupPlans != null) { 146 regionPlans.addAll(groupPlans); 147 } 148 } 149 } catch (IOException exp) { 150 LOG.warn("Exception while balancing cluster.", exp); 151 regionPlans.clear(); 152 } 153 return regionPlans; 154 } 155 156 @Override 157 public Map<ServerName, List<RegionInfo>> roundRobinAssignment( 158 List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException { 159 Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); 160 ListMultimap<String,RegionInfo> regionMap = ArrayListMultimap.create(); 161 ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create(); 162 generateGroupMaps(regions, servers, regionMap, serverMap); 163 for(String groupKey : regionMap.keySet()) { 164 if (regionMap.get(groupKey).size() > 0) { 165 Map<ServerName, List<RegionInfo>> result = 166 this.internalBalancer.roundRobinAssignment( 167 regionMap.get(groupKey), 168 serverMap.get(groupKey)); 169 if(result != null) { 170 if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) && 171 assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){ 172 assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll( 173 result.get(LoadBalancer.BOGUS_SERVER_NAME)); 174 } else { 175 assignments.putAll(result); 176 } 177 } 178 } 179 } 180 return assignments; 181 } 182 183 @Override 184 public Map<ServerName, List<RegionInfo>> retainAssignment( 185 Map<RegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException { 186 try { 187 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 188 ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create(); 189 Set<RegionInfo> misplacedRegions = getMisplacedRegions(regions); 190 for (RegionInfo region : regions.keySet()) { 191 if (!misplacedRegions.contains(region)) { 192 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 193 if (groupName == null) { 194 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 195 groupName = RSGroupInfo.DEFAULT_GROUP; 196 } 197 groupToRegion.put(groupName, region); 198 } 199 } 200 // Now the "groupToRegion" map has only the regions which have correct 201 // assignments. 202 for (String key : groupToRegion.keySet()) { 203 Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>(); 204 List<RegionInfo> regionList = groupToRegion.get(key); 205 RSGroupInfo info = rsGroupInfoManager.getRSGroup(key); 206 List<ServerName> candidateList = filterOfflineServers(info, servers); 207 for (RegionInfo region : regionList) { 208 currentAssignmentMap.put(region, regions.get(region)); 209 } 210 if(candidateList.size() > 0) { 211 assignments.putAll(this.internalBalancer.retainAssignment( 212 currentAssignmentMap, candidateList)); 213 } else{ 214 if (LOG.isDebugEnabled()) { 215 LOG.debug("No available server to assign regions: " + regionList.toString()); 216 } 217 for(RegionInfo region : regionList) { 218 if (!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { 219 assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>()); 220 } 221 assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); 222 } 223 } 224 } 225 226 for (RegionInfo region : misplacedRegions) { 227 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 228 if (groupName == null) { 229 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 230 groupName = RSGroupInfo.DEFAULT_GROUP; 231 } 232 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); 233 List<ServerName> candidateList = filterOfflineServers(info, servers); 234 ServerName server = this.internalBalancer.randomAssignment(region, 235 candidateList); 236 if (server != null) { 237 if (!assignments.containsKey(server)) { 238 assignments.put(server, new ArrayList<>()); 239 } 240 assignments.get(server).add(region); 241 } else { 242 //if not server is available assign to bogus so it ends up in RIT 243 if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { 244 assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>()); 245 } 246 assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); 247 } 248 } 249 return assignments; 250 } catch (IOException e) { 251 throw new HBaseIOException("Failed to do online retain assignment", e); 252 } 253 } 254 255 @Override 256 public ServerName randomAssignment(RegionInfo region, 257 List<ServerName> servers) throws HBaseIOException { 258 ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create(); 259 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); 260 generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); 261 List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next()); 262 return this.internalBalancer.randomAssignment(region, filteredServers); 263 } 264 265 private void generateGroupMaps( 266 List<RegionInfo> regions, 267 List<ServerName> servers, 268 ListMultimap<String, RegionInfo> regionMap, 269 ListMultimap<String, ServerName> serverMap) throws HBaseIOException { 270 try { 271 for (RegionInfo region : regions) { 272 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 273 if (groupName == null) { 274 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 275 groupName = RSGroupInfo.DEFAULT_GROUP; 276 } 277 regionMap.put(groupName, region); 278 } 279 for (String groupKey : regionMap.keySet()) { 280 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); 281 serverMap.putAll(groupKey, filterOfflineServers(info, servers)); 282 if(serverMap.get(groupKey).size() < 1) { 283 serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); 284 } 285 } 286 } catch(IOException e) { 287 throw new HBaseIOException("Failed to generate group maps", e); 288 } 289 } 290 291 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, 292 List<ServerName> onlineServers) { 293 if (RSGroupInfo != null) { 294 return filterServers(RSGroupInfo.getServers(), onlineServers); 295 } else { 296 LOG.warn("RSGroup Information found to be null. Some regions might be unassigned."); 297 return Collections.EMPTY_LIST; 298 } 299 } 300 301 /** 302 * Filter servers based on the online servers. 303 * 304 * @param servers 305 * the servers 306 * @param onlineServers 307 * List of servers which are online. 308 * @return the list 309 */ 310 private List<ServerName> filterServers(Set<Address> servers, 311 List<ServerName> onlineServers) { 312 /** 313 * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), 314 * having its contains()'s time complexity as O(logn), which is good enough. 315 * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain 316 * if needed. */ 317 ArrayList<ServerName> finalList = new ArrayList<>(); 318 for (ServerName onlineServer : onlineServers) { 319 if (servers.contains(onlineServer.getAddress())) { 320 finalList.add(onlineServer); 321 } 322 } 323 324 return finalList; 325 } 326 327 @VisibleForTesting 328 public Set<RegionInfo> getMisplacedRegions( 329 Map<RegionInfo, ServerName> regions) throws IOException { 330 Set<RegionInfo> misplacedRegions = new HashSet<>(); 331 for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) { 332 RegionInfo regionInfo = region.getKey(); 333 ServerName assignedServer = region.getValue(); 334 String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable()); 335 if (groupName == null) { 336 LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default"); 337 groupName = RSGroupInfo.DEFAULT_GROUP; 338 } 339 RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); 340 if (assignedServer == null) { 341 LOG.debug("There is no assigned server for {}", region); 342 continue; 343 } 344 RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress()); 345 if (info == null && otherInfo == null) { 346 LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer); 347 continue; 348 } 349 if ((info == null || !info.containsServer(assignedServer.getAddress()))) { 350 LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() + 351 " on server: " + assignedServer + 352 " found in group: " + otherInfo + 353 " outside of group: " + (info == null ? "UNKNOWN" : info.getName())); 354 misplacedRegions.add(regionInfo); 355 } 356 } 357 return misplacedRegions; 358 } 359 360 private ServerName findServerForRegion( 361 Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) { 362 for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) { 363 if (entry.getValue().contains(region)) { 364 return entry.getKey(); 365 } 366 } 367 368 throw new IllegalStateException("Could not find server for region " 369 + region.getShortNameToLog()); 370 } 371 372 private Map<ServerName, List<RegionInfo>> correctAssignments( 373 Map<ServerName, List<RegionInfo>> existingAssignments) 374 throws HBaseIOException{ 375 Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>(); 376 correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>()); 377 for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){ 378 ServerName sName = assignments.getKey(); 379 correctAssignments.put(sName, new LinkedList<>()); 380 List<RegionInfo> regions = assignments.getValue(); 381 for (RegionInfo region : regions) { 382 RSGroupInfo targetRSGInfo = null; 383 try { 384 String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); 385 if (groupName == null) { 386 LOG.debug("Group not found for table " + region.getTable() + ", using default"); 387 groupName = RSGroupInfo.DEFAULT_GROUP; 388 } 389 targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); 390 } catch (IOException exp) { 391 LOG.debug("RSGroup information null for region of table " + region.getTable(), 392 exp); 393 } 394 if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) { 395 correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); 396 } else { 397 correctAssignments.get(sName).add(region); 398 } 399 } 400 } 401 return correctAssignments; 402 } 403 404 @Override 405 public void initialize() throws HBaseIOException { 406 try { 407 if (rsGroupInfoManager == null) { 408 List<RSGroupAdminEndpoint> cps = 409 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); 410 if (cps.size() != 1) { 411 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); 412 LOG.error(msg); 413 throw new HBaseIOException(msg); 414 } 415 rsGroupInfoManager = cps.get(0).getGroupInfoManager(); 416 if(rsGroupInfoManager == null){ 417 String msg = "RSGroupInfoManager hasn't been initialized"; 418 LOG.error(msg); 419 throw new HBaseIOException(msg); 420 } 421 rsGroupInfoManager.start(); 422 } 423 } catch (IOException e) { 424 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); 425 } 426 427 // Create the balancer 428 Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, 429 StochasticLoadBalancer.class, LoadBalancer.class); 430 internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); 431 internalBalancer.setMasterServices(masterServices); 432 internalBalancer.setClusterMetrics(clusterStatus); 433 internalBalancer.setConf(config); 434 internalBalancer.initialize(); 435 } 436 437 public boolean isOnline() { 438 if (this.rsGroupInfoManager == null) { 439 return false; 440 } 441 442 return this.rsGroupInfoManager.isOnline(); 443 } 444 445 @Override 446 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) { 447 } 448 449 @Override 450 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 451 } 452 453 @Override 454 public void regionOffline(RegionInfo regionInfo) { 455 } 456 457 @Override 458 public void onConfigurationChange(Configuration conf) { 459 //DO nothing for now 460 } 461 462 @Override 463 public void stop(String why) { 464 } 465 466 @Override 467 public boolean isStopped() { 468 return false; 469 } 470 471 @VisibleForTesting 472 public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { 473 this.rsGroupInfoManager = rsGroupInfoManager; 474 } 475 476 @Override 477 public void postMasterStartupInitialize() { 478 this.internalBalancer.postMasterStartupInitialize(); 479 } 480}