001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseIOException;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.constraint.ConstraintException;
039import org.apache.hadoop.hbase.master.LoadBalancer;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionPlan;
042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
043import org.apache.hadoop.hbase.net.Address;
044import org.apache.hadoop.util.ReflectionUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055
056/**
057 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
058 * It does region balance based on a table's group membership.
059 *
060 * Most assignment methods contain two exclusive code paths: Online - when the group
061 * table is online and Offline - when it is unavailable.
062 *
063 * During Offline, assignments are assigned based on cached information in zookeeper.
064 * If unavailable (ie bootstrap) then regions are assigned randomly.
065 *
066 * Once the GROUP table has been assigned, the balancer switches to Online and will then
067 * start providing appropriate assignments for user tables.
068 *
069 */
070@InterfaceAudience.Private
071public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
072  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
073
074  private Configuration config;
075  private ClusterMetrics clusterStatus;
076  private MasterServices masterServices;
077  private volatile RSGroupInfoManager rsGroupInfoManager;
078  private LoadBalancer internalBalancer;
079
080  /**
081   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
082   */
083  @InterfaceAudience.Private
084  public RSGroupBasedLoadBalancer() {}
085
086  @Override
087  public Configuration getConf() {
088    return config;
089  }
090
091  @Override
092  public void setConf(Configuration conf) {
093    this.config = conf;
094  }
095
096  @Override
097  public void setClusterMetrics(ClusterMetrics sm) {
098    this.clusterStatus = sm;
099  }
100
101  @Override
102  public void setMasterServices(MasterServices masterServices) {
103    this.masterServices = masterServices;
104  }
105
106  @Override
107  public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
108      clusterState) throws HBaseIOException {
109    return balanceCluster(clusterState);
110  }
111
112  @Override
113  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
114      throws HBaseIOException {
115    if (!isOnline()) {
116      throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
117          " is not online, unable to perform balance");
118    }
119
120    Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState);
121    List<RegionPlan> regionPlans = new ArrayList<>();
122
123    List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
124    for (RegionInfo regionInfo : misplacedRegions) {
125      ServerName serverName = findServerForRegion(clusterState, regionInfo);
126      regionPlans.add(new RegionPlan(regionInfo, serverName, null));
127    }
128    try {
129      // Record which region servers have been processed,so as to skip them after processed
130      HashSet<ServerName> processedServers = new HashSet<>();
131
132      // For each rsgroup
133      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
134        Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
135        Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
136        for (ServerName server : clusterState.keySet()) { // for each region server
137          if (!processedServers.contains(server) // server is not processed yet
138              && rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup
139            List<RegionInfo> regionsOnServer = correctedState.get(server);
140            groupClusterState.put(server, regionsOnServer);
141
142            processedServers.add(server);
143          }
144        }
145
146        groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
147        this.internalBalancer.setClusterLoad(groupClusterLoad);
148        List<RegionPlan> groupPlans = this.internalBalancer
149            .balanceCluster(groupClusterState);
150        if (groupPlans != null) {
151          regionPlans.addAll(groupPlans);
152        }
153      }
154    } catch (IOException exp) {
155      LOG.warn("Exception while balancing cluster.", exp);
156      regionPlans.clear();
157    }
158    return regionPlans;
159  }
160
161  @Override
162  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
163    List<ServerName> servers) throws HBaseIOException {
164    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
165    ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
166    ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
167    generateGroupMaps(regions, servers, regionMap, serverMap);
168    for (String groupKey : regionMap.keySet()) {
169      if (regionMap.get(groupKey).size() > 0) {
170        Map<ServerName, List<RegionInfo>> result = this.internalBalancer
171          .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
172        if (result != null) {
173          if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
174            assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
175            assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
176              .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
177          } else {
178            assignments.putAll(result);
179          }
180        }
181      }
182    }
183    return assignments;
184  }
185
186  @Override
187  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
188    List<ServerName> servers) throws HBaseIOException {
189    try {
190      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
191      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
192      for (RegionInfo region : regions.keySet()) {
193        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
194        if (groupName == null) {
195          LOG.debug("Group not found for table " + region.getTable() + ", using default");
196          groupName = RSGroupInfo.DEFAULT_GROUP;
197        }
198        groupToRegion.put(groupName, region);
199      }
200      for (String key : groupToRegion.keySet()) {
201        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
202        List<RegionInfo> regionList = groupToRegion.get(key);
203        RSGroupInfo info = rsGroupInfoManager.getRSGroup(key);
204        List<ServerName> candidateList = filterOfflineServers(info, servers);
205        for (RegionInfo region : regionList) {
206          currentAssignmentMap.put(region, regions.get(region));
207        }
208        if (candidateList.size() > 0) {
209          assignments
210            .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
211        } else {
212          if (LOG.isDebugEnabled()) {
213            LOG.debug("No available servers to assign regions: {}",
214              RegionInfo.getShortNameToLog(regionList));
215          }
216          assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
217            .addAll(regionList);
218        }
219      }
220      return assignments;
221    } catch (IOException e) {
222      throw new HBaseIOException("Failed to do online retain assignment", e);
223    }
224  }
225
226  @Override
227  public ServerName randomAssignment(RegionInfo region,
228      List<ServerName> servers) throws HBaseIOException {
229    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
230    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
231    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
232    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
233    return this.internalBalancer.randomAssignment(region, filteredServers);
234  }
235
236  private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
237    ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
238    throws HBaseIOException {
239    try {
240      for (RegionInfo region : regions) {
241        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
242        if (groupName == null) {
243          LOG.debug("Group not found for table " + region.getTable() + ", using default");
244          groupName = RSGroupInfo.DEFAULT_GROUP;
245        }
246        regionMap.put(groupName, region);
247      }
248      for (String groupKey : regionMap.keySet()) {
249        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
250        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
251        if(serverMap.get(groupKey).size() < 1) {
252          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
253        }
254      }
255    } catch(IOException e) {
256      throw new HBaseIOException("Failed to generate group maps", e);
257    }
258  }
259
260  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
261    List<ServerName> onlineServers) {
262    if (RSGroupInfo != null) {
263      return filterServers(RSGroupInfo.getServers(), onlineServers);
264    } else {
265      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
266      return Collections.emptyList();
267    }
268  }
269
270  /**
271   * Filter servers based on the online servers.
272   * <p/>
273   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
274   * its contains()'s time complexity as O(logn), which is good enough.
275   * <p/>
276   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
277   * needed.
278   * @param servers the servers
279   * @param onlineServers List of servers which are online.
280   * @return the list
281   */
282  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
283    ArrayList<ServerName> finalList = new ArrayList<>();
284    for (ServerName onlineServer : onlineServers) {
285      if (servers.contains(onlineServer.getAddress())) {
286        finalList.add(onlineServer);
287      }
288    }
289    return finalList;
290  }
291
292  private ServerName findServerForRegion(
293      Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
294    for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
295      if (entry.getValue().contains(region)) {
296        return entry.getKey();
297      }
298    }
299
300    throw new IllegalStateException("Could not find server for region "
301        + region.getShortNameToLog());
302  }
303
304  private Map<ServerName, List<RegionInfo>> correctAssignments(
305       Map<ServerName, List<RegionInfo>> existingAssignments)
306  throws HBaseIOException{
307    Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
308    correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>());
309    for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
310      ServerName sName = assignments.getKey();
311      correctAssignments.put(sName, new LinkedList<>());
312      List<RegionInfo> regions = assignments.getValue();
313      for (RegionInfo region : regions) {
314        RSGroupInfo targetRSGInfo = null;
315        try {
316          String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
317          if (groupName == null) {
318            LOG.debug("Group not found for table " + region.getTable() + ", using default");
319            groupName = RSGroupInfo.DEFAULT_GROUP;
320          }
321          targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
322        } catch (IOException exp) {
323          LOG.debug("RSGroup information null for region of table " + region.getTable(),
324              exp);
325        }
326        if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) {
327          correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
328        } else {
329          correctAssignments.get(sName).add(region);
330        }
331      }
332    }
333    return correctAssignments;
334  }
335
336  @Override
337  public void initialize() throws HBaseIOException {
338    try {
339      if (rsGroupInfoManager == null) {
340        List<RSGroupAdminEndpoint> cps =
341          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
342        if (cps.size() != 1) {
343          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
344          LOG.error(msg);
345          throw new HBaseIOException(msg);
346        }
347        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
348        if(rsGroupInfoManager == null){
349          String msg = "RSGroupInfoManager hasn't been initialized";
350          LOG.error(msg);
351          throw new HBaseIOException(msg);
352        }
353        rsGroupInfoManager.start();
354      }
355    } catch (IOException e) {
356      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
357    }
358
359    // Create the balancer
360    Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
361        StochasticLoadBalancer.class, LoadBalancer.class);
362    internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
363    internalBalancer.setMasterServices(masterServices);
364    internalBalancer.setClusterMetrics(clusterStatus);
365    internalBalancer.setConf(config);
366    internalBalancer.initialize();
367  }
368
369  public boolean isOnline() {
370    if (this.rsGroupInfoManager == null) {
371      return false;
372    }
373
374    return this.rsGroupInfoManager.isOnline();
375  }
376
377  @Override
378  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
379  }
380
381  @Override
382  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
383  }
384
385  @Override
386  public void regionOffline(RegionInfo regionInfo) {
387  }
388
389  @Override
390  public void onConfigurationChange(Configuration conf) {
391    //DO nothing for now
392  }
393
394  @Override
395  public void stop(String why) {
396  }
397
398  @Override
399  public boolean isStopped() {
400    return false;
401  }
402
403  @VisibleForTesting
404  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
405    this.rsGroupInfoManager = rsGroupInfoManager;
406  }
407
408  @Override
409  public void postMasterStartupInitialize() {
410    this.internalBalancer.postMasterStartupInitialize();
411  }
412
413  public void updateBalancerStatus(boolean status) {
414    internalBalancer.updateBalancerStatus(status);
415  }
416}