001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterMetrics;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.constraint.ConstraintException;
036import org.apache.hadoop.hbase.favored.FavoredNodesManager;
037import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
038import org.apache.hadoop.hbase.master.LoadBalancer;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.RegionPlan;
041import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
042import org.apache.hadoop.hbase.net.Address;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.util.ReflectionUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055
056/**
057 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does
058 * region balance based on a table's group membership.
059 * <p/>
060 * Most assignment methods contain two exclusive code paths: Online - when the group table is online
061 * and Offline - when it is unavailable.
062 * <p/>
063 * During Offline, assignments are assigned based on cached information in zookeeper. If unavailable
064 * (ie bootstrap) then regions are assigned randomly.
065 * <p/>
066 * Once the GROUP table has been assigned, the balancer switches to Online and will then start
067 * providing appropriate assignments for user tables.
068 */
069@InterfaceAudience.Private
070public class RSGroupBasedLoadBalancer implements LoadBalancer {
071  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
072
073  private Configuration config;
074  private ClusterMetrics clusterStatus;
075  private MasterServices masterServices;
076  private FavoredNodesManager favoredNodesManager;
077  private volatile RSGroupInfoManager rsGroupInfoManager;
078  private LoadBalancer internalBalancer;
079
080  /**
081   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
082   */
083  @InterfaceAudience.Private
084  public RSGroupBasedLoadBalancer() {}
085
086  @Override
087  public Configuration getConf() {
088    return config;
089  }
090
091  @Override
092  public void setConf(Configuration conf) {
093    this.config = conf;
094    if(internalBalancer != null) {
095      internalBalancer.setConf(conf);
096    }
097  }
098
099  @Override
100  public void setClusterMetrics(ClusterMetrics sm) {
101    this.clusterStatus = sm;
102    if (internalBalancer != null) {
103      internalBalancer.setClusterMetrics(sm);
104    }
105  }
106
107  @Override
108  public void setMasterServices(MasterServices masterServices) {
109    this.masterServices = masterServices;
110  }
111
112  /**
113   * Override to balance by RSGroup
114   * not invoke {@link #balanceTable(TableName, Map)}
115   */
116  @Override
117  public List<RegionPlan> balanceCluster(
118      Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
119    if (!isOnline()) {
120      throw new ConstraintException(
121          RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance");
122    }
123
124    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
125    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
126      correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
127    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
128        correctedStateAndRegionPlans.getFirst();
129    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
130    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
131    // Add RegionPlan
132    // for the regions which have been placed according to the region server group assignment
133    // into the movement list
134    try {
135      // For each rsgroup
136      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
137        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
138        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
139            .entrySet()) {
140          TableName tableName = entry.getKey();
141          RSGroupInfo targetRSGInfo = RSGroupUtil
142              .getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo);
143          if (targetRSGInfo.getName().equals(rsgroup.getName())) {
144            loadOfTablesInGroup.put(tableName, entry.getValue());
145          }
146        }
147        List<RegionPlan> groupPlans = null;
148        if (!loadOfTablesInGroup.isEmpty()) {
149          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
150          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
151        }
152        if (groupPlans != null) {
153          regionPlans.addAll(groupPlans);
154        }
155      }
156    } catch (IOException exp) {
157      LOG.warn("Exception while balancing cluster.", exp);
158      regionPlans.clear();
159    }
160
161    // Return the whole movement list
162    return regionPlans;
163  }
164
165  @Override
166  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
167      List<RegionInfo> regions, List<ServerName> servers) throws IOException {
168    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
169    ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
170    ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
171    generateGroupMaps(regions, servers, regionMap, serverMap);
172    for (String groupKey : regionMap.keySet()) {
173      if (regionMap.get(groupKey).size() > 0) {
174        Map<ServerName, List<RegionInfo>> result = this.internalBalancer
175          .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
176        if (result != null) {
177          if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
178            assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
179            assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
180              .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
181          } else {
182            assignments.putAll(result);
183          }
184        }
185      }
186    }
187    return assignments;
188  }
189
190  @Override
191  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
192    List<ServerName> servers) throws HBaseIOException {
193    try {
194      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
195      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
196      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
197      for (RegionInfo region : regions.keySet()) {
198        String groupName =
199          RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
200              .orElse(defaultInfo).getName();
201        groupToRegion.put(groupName, region);
202      }
203      for (String key : groupToRegion.keySet()) {
204        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
205        List<RegionInfo> regionList = groupToRegion.get(key);
206        RSGroupInfo info = rsGroupInfoManager.getRSGroup(key);
207        List<ServerName> candidateList = filterOfflineServers(info, servers);
208        for (RegionInfo region : regionList) {
209          currentAssignmentMap.put(region, regions.get(region));
210        }
211        if (candidateList.size() > 0) {
212          assignments
213            .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
214        } else {
215          if (LOG.isDebugEnabled()) {
216            LOG.debug("No available servers to assign regions: {}",
217              RegionInfo.getShortNameToLog(regionList));
218          }
219          assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
220            .addAll(regionList);
221        }
222      }
223      return assignments;
224    } catch (IOException e) {
225      throw new HBaseIOException("Failed to do online retain assignment", e);
226    }
227  }
228
229  @Override
230  public ServerName randomAssignment(RegionInfo region,
231      List<ServerName> servers) throws IOException {
232    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
233    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
234    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
235    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
236    return this.internalBalancer.randomAssignment(region, filteredServers);
237  }
238
239  private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
240    ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
241    throws HBaseIOException {
242    try {
243      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
244      for (RegionInfo region : regions) {
245        String groupName =
246            RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
247                .orElse(defaultInfo).getName();
248        regionMap.put(groupName, region);
249      }
250      for (String groupKey : regionMap.keySet()) {
251        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
252        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
253        if(serverMap.get(groupKey).size() < 1) {
254          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
255        }
256      }
257    } catch(IOException e) {
258      throw new HBaseIOException("Failed to generate group maps", e);
259    }
260  }
261
262  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
263                                                List<ServerName> onlineServers) {
264    if (RSGroupInfo != null) {
265      return filterServers(RSGroupInfo.getServers(), onlineServers);
266    } else {
267      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
268      return Collections.emptyList();
269    }
270  }
271
272  /**
273   * Filter servers based on the online servers.
274   * <p/>
275   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
276   * its contains()'s time complexity as O(logn), which is good enough.
277   * <p/>
278   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
279   * needed.
280   * @param servers the servers
281   * @param onlineServers List of servers which are online.
282   * @return the list
283   */
284  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
285    ArrayList<ServerName> finalList = new ArrayList<>();
286    for (ServerName onlineServer : onlineServers) {
287      if (servers.contains(onlineServer.getAddress())) {
288        finalList.add(onlineServer);
289      }
290    }
291    return finalList;
292  }
293
294  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
295      correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
296          throws IOException {
297    // To return
298    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
299    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
300    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
301    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
302        .entrySet()) {
303      TableName tableName = assignments.getKey();
304      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
305      RSGroupInfo targetRSGInfo = null;
306      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
307      try {
308        targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName)
309            .orElse(defaultInfo);
310      } catch (IOException exp) {
311        LOG.debug("RSGroup information null for region of table " + tableName, exp);
312      }
313      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
314        ServerName currentHostServer = serverRegionMap.getKey();
315        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
316        if (targetRSGInfo == null
317            || !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
318          regionInfoList.forEach(regionInfo -> {
319            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
320          });
321        } else {
322          correctServerRegion.put(currentHostServer, regionInfoList);
323        }
324      }
325      correctAssignments.put(tableName, correctServerRegion);
326    }
327
328    // Return correct assignments and region movement plan for mis-placed regions together
329    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
330        correctAssignments, regionPlansForMisplacedRegions);
331  }
332
333  @Override
334  public void initialize() throws IOException {
335    if (rsGroupInfoManager == null) {
336      rsGroupInfoManager = masterServices.getRSGroupInfoManager();
337      if (rsGroupInfoManager == null) {
338        String msg = "RSGroupInfoManager hasn't been initialized";
339        LOG.error(msg);
340        throw new HBaseIOException(msg);
341      }
342      rsGroupInfoManager.start();
343    }
344
345    // Create the balancer
346    Class<? extends LoadBalancer> balancerClass;
347    String balancerClassName = config.get(HBASE_RSGROUP_LOADBALANCER_CLASS);
348    if (balancerClassName == null) {
349      balancerClass = config.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
350        LoadBalancerFactory.getDefaultLoadBalancerClass(), LoadBalancer.class);
351    } else {
352      try {
353        balancerClass = Class.forName(balancerClassName).asSubclass(LoadBalancer.class);
354      } catch (ClassNotFoundException e) {
355        throw new IOException(e);
356      }
357    }
358    // avoid infinite nesting
359    if (getClass().isAssignableFrom(balancerClass)) {
360      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
361    }
362    internalBalancer = ReflectionUtils.newInstance(balancerClass);
363    if (internalBalancer instanceof FavoredNodesPromoter) {
364      favoredNodesManager = new FavoredNodesManager(masterServices);
365    }
366    internalBalancer.setConf(config);
367    internalBalancer.setMasterServices(masterServices);
368    if(clusterStatus != null) {
369      internalBalancer.setClusterMetrics(clusterStatus);
370    }
371    internalBalancer.initialize();
372  }
373
374  public boolean isOnline() {
375    if (this.rsGroupInfoManager == null) {
376      return false;
377    }
378
379    return this.rsGroupInfoManager.isOnline();
380  }
381
382
383  @Override
384  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
385  }
386
387  @Override
388  public void regionOffline(RegionInfo regionInfo) {
389  }
390
391  @Override
392  public void onConfigurationChange(Configuration conf) {
393    //DO nothing for now
394  }
395
396  @Override
397  public void stop(String why) {
398  }
399
400  @Override
401  public boolean isStopped() {
402    return false;
403  }
404
405  @VisibleForTesting
406  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
407    this.rsGroupInfoManager = rsGroupInfoManager;
408  }
409
410  public LoadBalancer getInternalBalancer() {
411    return internalBalancer;
412  }
413
414  public FavoredNodesManager getFavoredNodesManager() {
415    return favoredNodesManager;
416  }
417
418  @Override
419  public void postMasterStartupInitialize() {
420    this.internalBalancer.postMasterStartupInitialize();
421  }
422
423  public void updateBalancerStatus(boolean status) {
424    internalBalancer.updateBalancerStatus(status);
425  }
426
427  /**
428   * can achieve table balanced rather than overall balanced
429   */
430  @Override
431  public List<RegionPlan> balanceTable(TableName tableName,
432      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
433    if (!isOnline()) {
434      LOG.error(RSGroupInfoManager.class.getSimpleName()
435          + " is not online, unable to perform balanceTable");
436      return null;
437    }
438    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
439    loadOfThisTable.put(tableName, loadOfOneTable);
440    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
441      correctedStateAndRegionPlans;
442    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
443    try {
444      correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
445    } catch (IOException e) {
446      LOG.error("get correct assignments and mis-placed regions error ", e);
447      return null;
448    }
449    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
450        correctedStateAndRegionPlans.getFirst();
451    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
452    List<RegionPlan> tablePlans =
453        this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
454
455    if (tablePlans != null) {
456      regionPlans.addAll(tablePlans);
457    }
458    return regionPlans;
459  }
460}