001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.constraint.ConstraintException;
040import org.apache.hadoop.hbase.master.LoadBalancer;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
044import org.apache.hadoop.hbase.net.Address;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
053import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
056
057/**
058 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
059 * It does region balance based on a table's group membership.
060 *
061 * Most assignment methods contain two exclusive code paths: Online - when the group
062 * table is online and Offline - when it is unavailable.
063 *
064 * During Offline, assignments are assigned based on cached information in zookeeper.
065 * If unavailable (ie bootstrap) then regions are assigned randomly.
066 *
067 * Once the GROUP table has been assigned, the balancer switches to Online and will then
068 * start providing appropriate assignments for user tables.
069 *
070 */
071@InterfaceAudience.Private
072public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
073  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
074
075  private Configuration config;
076  private ClusterMetrics clusterStatus;
077  private MasterServices masterServices;
078  private volatile RSGroupInfoManager rsGroupInfoManager;
079  private LoadBalancer internalBalancer;
080
081  /**
082   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
083   */
084  @InterfaceAudience.Private
085  public RSGroupBasedLoadBalancer() {}
086
087  @Override
088  public Configuration getConf() {
089    return config;
090  }
091
092  @Override
093  public void setConf(Configuration conf) {
094    this.config = conf;
095  }
096
097  @Override
098  public void setClusterMetrics(ClusterMetrics sm) {
099    this.clusterStatus = sm;
100  }
101
102  @Override
103  public void setMasterServices(MasterServices masterServices) {
104    this.masterServices = masterServices;
105  }
106
107  @Override
108  public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
109      clusterState) throws HBaseIOException {
110    return balanceCluster(clusterState);
111  }
112
113  @Override
114  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
115      throws HBaseIOException {
116    if (!isOnline()) {
117      throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
118          " is not online, unable to perform balance");
119    }
120
121    Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState);
122    List<RegionPlan> regionPlans = new ArrayList<>();
123
124    List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
125    for (RegionInfo regionInfo : misplacedRegions) {
126      ServerName serverName = findServerForRegion(clusterState, regionInfo);
127      regionPlans.add(new RegionPlan(regionInfo, serverName, null));
128    }
129    try {
130      List<RSGroupInfo> rsgi = rsGroupInfoManager.listRSGroups();
131      for (RSGroupInfo info: rsgi) {
132        Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
133        Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
134        for (Address sName : info.getServers()) {
135          for(ServerName curr: clusterState.keySet()) {
136            if(curr.getAddress().equals(sName)) {
137              groupClusterState.put(curr, correctedState.get(curr));
138            }
139          }
140        }
141        groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
142        this.internalBalancer.setClusterLoad(groupClusterLoad);
143        List<RegionPlan> groupPlans = this.internalBalancer
144            .balanceCluster(groupClusterState);
145        if (groupPlans != null) {
146          regionPlans.addAll(groupPlans);
147        }
148      }
149    } catch (IOException exp) {
150      LOG.warn("Exception while balancing cluster.", exp);
151      regionPlans.clear();
152    }
153    return regionPlans;
154  }
155
156  @Override
157  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
158      List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
159    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
160    ListMultimap<String,RegionInfo> regionMap = ArrayListMultimap.create();
161    ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
162    generateGroupMaps(regions, servers, regionMap, serverMap);
163    for(String groupKey : regionMap.keySet()) {
164      if (regionMap.get(groupKey).size() > 0) {
165        Map<ServerName, List<RegionInfo>> result =
166            this.internalBalancer.roundRobinAssignment(
167                regionMap.get(groupKey),
168                serverMap.get(groupKey));
169        if(result != null) {
170          if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
171              assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){
172            assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
173              result.get(LoadBalancer.BOGUS_SERVER_NAME));
174          } else {
175            assignments.putAll(result);
176          }
177        }
178      }
179    }
180    return assignments;
181  }
182
183  @Override
184  public Map<ServerName, List<RegionInfo>> retainAssignment(
185      Map<RegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
186    try {
187      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
188      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
189      Set<RegionInfo> misplacedRegions = getMisplacedRegions(regions);
190      for (RegionInfo region : regions.keySet()) {
191        if (!misplacedRegions.contains(region)) {
192          String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
193          if (groupName == null) {
194            LOG.debug("Group not found for table " + region.getTable() + ", using default");
195            groupName = RSGroupInfo.DEFAULT_GROUP;
196          }
197          groupToRegion.put(groupName, region);
198        }
199      }
200      // Now the "groupToRegion" map has only the regions which have correct
201      // assignments.
202      for (String key : groupToRegion.keySet()) {
203        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
204        List<RegionInfo> regionList = groupToRegion.get(key);
205        RSGroupInfo info = rsGroupInfoManager.getRSGroup(key);
206        List<ServerName> candidateList = filterOfflineServers(info, servers);
207        for (RegionInfo region : regionList) {
208          currentAssignmentMap.put(region, regions.get(region));
209        }
210        if(candidateList.size() > 0) {
211          assignments.putAll(this.internalBalancer.retainAssignment(
212              currentAssignmentMap, candidateList));
213        } else{
214          if (LOG.isDebugEnabled()) {
215            LOG.debug("No available server to assign regions: " + regionList.toString());
216          }
217          for(RegionInfo region : regionList) {
218            if (!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
219              assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
220            }
221            assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
222          }
223        }
224      }
225
226      for (RegionInfo region : misplacedRegions) {
227        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
228        if (groupName == null) {
229          LOG.debug("Group not found for table " + region.getTable() + ", using default");
230          groupName = RSGroupInfo.DEFAULT_GROUP;
231        }
232        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
233        List<ServerName> candidateList = filterOfflineServers(info, servers);
234        ServerName server = this.internalBalancer.randomAssignment(region,
235            candidateList);
236        if (server != null) {
237          if (!assignments.containsKey(server)) {
238            assignments.put(server, new ArrayList<>());
239          }
240          assignments.get(server).add(region);
241        } else {
242          //if not server is available assign to bogus so it ends up in RIT
243          if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
244            assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<>());
245          }
246          assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
247        }
248      }
249      return assignments;
250    } catch (IOException e) {
251      throw new HBaseIOException("Failed to do online retain assignment", e);
252    }
253  }
254
255  @Override
256  public ServerName randomAssignment(RegionInfo region,
257      List<ServerName> servers) throws HBaseIOException {
258    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
259    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
260    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
261    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
262    return this.internalBalancer.randomAssignment(region, filteredServers);
263  }
264
265  private void generateGroupMaps(
266    List<RegionInfo> regions,
267    List<ServerName> servers,
268    ListMultimap<String, RegionInfo> regionMap,
269    ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
270    try {
271      for (RegionInfo region : regions) {
272        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
273        if (groupName == null) {
274          LOG.debug("Group not found for table " + region.getTable() + ", using default");
275          groupName = RSGroupInfo.DEFAULT_GROUP;
276        }
277        regionMap.put(groupName, region);
278      }
279      for (String groupKey : regionMap.keySet()) {
280        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
281        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
282        if(serverMap.get(groupKey).size() < 1) {
283          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
284        }
285      }
286    } catch(IOException e) {
287      throw new HBaseIOException("Failed to generate group maps", e);
288    }
289  }
290
291  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
292                                                List<ServerName> onlineServers) {
293    if (RSGroupInfo != null) {
294      return filterServers(RSGroupInfo.getServers(), onlineServers);
295    } else {
296      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
297      return Collections.EMPTY_LIST;
298    }
299  }
300
301  /**
302   * Filter servers based on the online servers.
303   *
304   * @param servers
305   *          the servers
306   * @param onlineServers
307   *          List of servers which are online.
308   * @return the list
309   */
310  private List<ServerName> filterServers(Set<Address> servers,
311                                         List<ServerName> onlineServers) {
312    /**
313     * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}),
314     * having its contains()'s time complexity as O(logn), which is good enough.
315     * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain
316     * if needed. */
317    ArrayList<ServerName> finalList = new ArrayList<>();
318    for (ServerName onlineServer : onlineServers) {
319      if (servers.contains(onlineServer.getAddress())) {
320        finalList.add(onlineServer);
321      }
322    }
323
324    return finalList;
325  }
326
327  @VisibleForTesting
328  public Set<RegionInfo> getMisplacedRegions(
329      Map<RegionInfo, ServerName> regions) throws IOException {
330    Set<RegionInfo> misplacedRegions = new HashSet<>();
331    for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) {
332      RegionInfo regionInfo = region.getKey();
333      ServerName assignedServer = region.getValue();
334      String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
335      if (groupName == null) {
336        LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default");
337        groupName = RSGroupInfo.DEFAULT_GROUP;
338      }
339      RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
340      if (assignedServer == null) {
341        LOG.debug("There is no assigned server for {}", region);
342        continue;
343      }
344      RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
345      if (info == null && otherInfo == null) {
346        LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer);
347        continue;
348      }
349      if ((info == null || !info.containsServer(assignedServer.getAddress()))) {
350        LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() +
351            " on server: " + assignedServer +
352            " found in group: " +  otherInfo +
353            " outside of group: " + (info == null ? "UNKNOWN" : info.getName()));
354        misplacedRegions.add(regionInfo);
355      }
356    }
357    return misplacedRegions;
358  }
359
360  private ServerName findServerForRegion(
361      Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
362    for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
363      if (entry.getValue().contains(region)) {
364        return entry.getKey();
365      }
366    }
367
368    throw new IllegalStateException("Could not find server for region "
369        + region.getShortNameToLog());
370  }
371
372  private Map<ServerName, List<RegionInfo>> correctAssignments(
373       Map<ServerName, List<RegionInfo>> existingAssignments)
374  throws HBaseIOException{
375    Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
376    correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>());
377    for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
378      ServerName sName = assignments.getKey();
379      correctAssignments.put(sName, new LinkedList<>());
380      List<RegionInfo> regions = assignments.getValue();
381      for (RegionInfo region : regions) {
382        RSGroupInfo targetRSGInfo = null;
383        try {
384          String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
385          if (groupName == null) {
386            LOG.debug("Group not found for table " + region.getTable() + ", using default");
387            groupName = RSGroupInfo.DEFAULT_GROUP;
388          }
389          targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
390        } catch (IOException exp) {
391          LOG.debug("RSGroup information null for region of table " + region.getTable(),
392              exp);
393        }
394        if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) {
395          correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
396        } else {
397          correctAssignments.get(sName).add(region);
398        }
399      }
400    }
401    return correctAssignments;
402  }
403
404  @Override
405  public void initialize() throws HBaseIOException {
406    try {
407      if (rsGroupInfoManager == null) {
408        List<RSGroupAdminEndpoint> cps =
409          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
410        if (cps.size() != 1) {
411          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
412          LOG.error(msg);
413          throw new HBaseIOException(msg);
414        }
415        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
416        if(rsGroupInfoManager == null){
417          String msg = "RSGroupInfoManager hasn't been initialized";
418          LOG.error(msg);
419          throw new HBaseIOException(msg);
420        }
421        rsGroupInfoManager.start();
422      }
423    } catch (IOException e) {
424      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
425    }
426
427    // Create the balancer
428    Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
429        StochasticLoadBalancer.class, LoadBalancer.class);
430    internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
431    internalBalancer.setMasterServices(masterServices);
432    internalBalancer.setClusterMetrics(clusterStatus);
433    internalBalancer.setConf(config);
434    internalBalancer.initialize();
435  }
436
437  public boolean isOnline() {
438    if (this.rsGroupInfoManager == null) {
439      return false;
440    }
441
442    return this.rsGroupInfoManager.isOnline();
443  }
444
445  @Override
446  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
447  }
448
449  @Override
450  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
451  }
452
453  @Override
454  public void regionOffline(RegionInfo regionInfo) {
455  }
456
457  @Override
458  public void onConfigurationChange(Configuration conf) {
459    //DO nothing for now
460  }
461
462  @Override
463  public void stop(String why) {
464  }
465
466  @Override
467  public boolean isStopped() {
468    return false;
469  }
470
471  @VisibleForTesting
472  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
473    this.rsGroupInfoManager = rsGroupInfoManager;
474  }
475
476  @Override
477  public void postMasterStartupInitialize() {
478    this.internalBalancer.postMasterStartupInitialize();
479  }
480}