001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.TreeMap;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.constraint.ConstraintException;
038import org.apache.hadoop.hbase.favored.FavoredNodesManager;
039import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
040import org.apache.hadoop.hbase.master.LoadBalancer;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
044import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
045import org.apache.hadoop.hbase.master.balancer.MasterClusterInfoProvider;
046import org.apache.hadoop.hbase.net.Address;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
054import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
057
058/**
059 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does
060 * region balance based on a table's group membership.
061 * <p/>
062 * Most assignment methods contain two exclusive code paths: Online - when the group table is online
063 * and Offline - when it is unavailable.
064 * <p/>
065 * During Offline, assignments are assigned based on cached information in zookeeper. If unavailable
066 * (ie bootstrap) then regions are assigned randomly.
067 * <p/>
068 * Once the GROUP table has been assigned, the balancer switches to Online and will then start
069 * providing appropriate assignments for user tables.
070 */
071@InterfaceAudience.Private
072public class RSGroupBasedLoadBalancer implements LoadBalancer {
073  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
074
075  private MasterServices masterServices;
076  private ClusterInfoProvider provider;
077  private FavoredNodesManager favoredNodesManager;
078  private volatile RSGroupInfoManager rsGroupInfoManager;
079  private volatile LoadBalancer internalBalancer;
080
081  /**
082   * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first,
083   * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch
084   * on at the same time, which is relied on to correct misplaced regions
085   */
086  public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
087
088  private volatile boolean fallbackEnabled = false;
089
090  /**
091   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
092   */
093  @InterfaceAudience.Private
094  public RSGroupBasedLoadBalancer() {
095  }
096
097  // must be called after calling initialize
098  @Override
099  public synchronized void updateClusterMetrics(ClusterMetrics sm) {
100    assert internalBalancer != null;
101    internalBalancer.updateClusterMetrics(sm);
102  }
103
104  @Override
105  public synchronized void
106    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
107    internalBalancer.updateBalancerLoadInfo(loadOfAllTable);
108  }
109
110  public void setMasterServices(MasterServices masterServices) {
111    this.masterServices = masterServices;
112  }
113
114  /**
115   * Balance by RSGroup.
116   */
117  @Override
118  public synchronized List<RegionPlan> balanceCluster(
119    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
120    if (!isOnline()) {
121      throw new ConstraintException(
122        RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance");
123    }
124
125    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
126    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>,
127      List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
128    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
129      correctedStateAndRegionPlans.getFirst();
130    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
131    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
132    // Add RegionPlan
133    // for the regions which have been placed according to the region server group assignment
134    // into the movement list
135    try {
136      // For each rsgroup
137      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
138        LOG.debug("Balancing RSGroup={}", rsgroup.getName());
139        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
140        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
141          .entrySet()) {
142          TableName tableName = entry.getKey();
143          RSGroupInfo targetRSGInfo = RSGroupUtil
144            .getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo);
145          if (targetRSGInfo.getName().equals(rsgroup.getName())) {
146            loadOfTablesInGroup.put(tableName, entry.getValue());
147          }
148        }
149        List<RegionPlan> groupPlans = null;
150        if (!loadOfTablesInGroup.isEmpty()) {
151          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
152          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
153        }
154        if (groupPlans != null) {
155          regionPlans.addAll(groupPlans);
156        }
157      }
158    } catch (IOException exp) {
159      LOG.warn("Exception while balancing cluster.", exp);
160      regionPlans.clear();
161    }
162
163    // Return the whole movement list
164    return regionPlans;
165  }
166
167  @Override
168  @NonNull
169  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
170    List<ServerName> servers) throws IOException {
171    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
172    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
173      generateGroupAssignments(regions, servers);
174    for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
175      Map<ServerName, List<RegionInfo>> result =
176        this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond());
177      result.forEach((server, regionInfos) -> assignments
178        .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
179    }
180    return assignments;
181  }
182
183  @Override
184  @NonNull
185  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
186    List<ServerName> servers) throws HBaseIOException {
187    try {
188      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
189      List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
190        generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
191      for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
192        List<RegionInfo> regionList = pair.getFirst();
193        Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
194        regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
195        Map<ServerName, List<RegionInfo>> pairResult =
196          this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
197        pairResult.forEach((server, rs) -> assignments
198          .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
199      }
200      return assignments;
201    } catch (IOException e) {
202      throw new HBaseIOException("Failed to do online retain assignment", e);
203    }
204  }
205
206  @Override
207  public ServerName randomAssignment(RegionInfo region, List<ServerName> servers)
208    throws IOException {
209    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
210      generateGroupAssignments(Lists.newArrayList(region), servers);
211    List<ServerName> filteredServers = pairs.iterator().next().getSecond();
212    return this.internalBalancer.randomAssignment(region, filteredServers);
213  }
214
215  private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
216    List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
217    try {
218      ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
219      ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
220      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
221      for (RegionInfo region : regions) {
222        String groupName =
223          RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
224            .orElse(defaultInfo).getName();
225        regionMap.put(groupName, region);
226      }
227      for (String groupKey : regionMap.keySet()) {
228        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
229        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
230      }
231
232      List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
233      List<RegionInfo> fallbackRegions = Lists.newArrayList();
234      for (String groupKey : regionMap.keySet()) {
235        if (serverMap.get(groupKey).isEmpty()) {
236          fallbackRegions.addAll(regionMap.get(groupKey));
237        } else {
238          result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
239        }
240      }
241      if (!fallbackRegions.isEmpty()) {
242        List<ServerName> candidates = null;
243        if (isFallbackEnabled()) {
244          if (LOG.isDebugEnabled()) {
245            LOG.debug("Falling back {} regions to servers outside their RSGroup. Regions: {}",
246              fallbackRegions.size(), fallbackRegions.stream()
247                .map(RegionInfo::getRegionNameAsString).collect(Collectors.toSet()));
248          }
249          candidates = getFallBackCandidates(servers);
250        }
251        candidates = (candidates == null || candidates.isEmpty())
252          ? Lists.newArrayList(BOGUS_SERVER_NAME)
253          : candidates;
254        result.add(Pair.newPair(fallbackRegions, candidates));
255      }
256      return result;
257    } catch (IOException e) {
258      throw new HBaseIOException("Failed to generate group assignments", e);
259    }
260  }
261
262  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
263    List<ServerName> onlineServers) {
264    if (RSGroupInfo != null) {
265      return filterServers(RSGroupInfo.getServers(), onlineServers);
266    } else {
267      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
268      return Collections.emptyList();
269    }
270  }
271
272  /**
273   * Filter servers based on the online servers.
274   * <p/>
275   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
276   * its contains()'s time complexity as O(logn), which is good enough.
277   * <p/>
278   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
279   * needed.
280   * @param servers       the servers
281   * @param onlineServers List of servers which are online.
282   * @return the list
283   */
284  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
285    ArrayList<ServerName> finalList = new ArrayList<>();
286    for (ServerName onlineServer : onlineServers) {
287      if (servers.contains(onlineServer.getAddress())) {
288        finalList.add(onlineServer);
289      }
290    }
291    return finalList;
292  }
293
294  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
295    correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
296      throws IOException {
297    // To return
298    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
299    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
300    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
301    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
302      .entrySet()) {
303      TableName tableName = assignments.getKey();
304      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
305      RSGroupInfo targetRSGInfo = null;
306      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
307      try {
308        targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName)
309          .orElse(defaultInfo);
310      } catch (IOException exp) {
311        LOG.debug("RSGroup information null for region of table " + tableName, exp);
312      }
313      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
314        ServerName currentHostServer = serverRegionMap.getKey();
315        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
316        if (
317          targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress())
318        ) {
319          regionInfoList.forEach(regionInfo -> {
320            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
321          });
322        } else {
323          correctServerRegion.put(currentHostServer, regionInfoList);
324        }
325      }
326      correctAssignments.put(tableName, correctServerRegion);
327    }
328
329    // Return correct assignments and region movement plan for mis-placed regions together
330    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
331      correctAssignments, regionPlansForMisplacedRegions);
332  }
333
334  @Override
335  public void initialize() throws IOException {
336    if (rsGroupInfoManager == null) {
337      rsGroupInfoManager = masterServices.getRSGroupInfoManager();
338      if (rsGroupInfoManager == null) {
339        String msg = "RSGroupInfoManager hasn't been initialized";
340        LOG.error(msg);
341        throw new HBaseIOException(msg);
342      }
343      rsGroupInfoManager.start();
344    }
345
346    // Create the balancer
347    Configuration conf = masterServices.getConfiguration();
348    Class<? extends LoadBalancer> balancerClass;
349    @SuppressWarnings("deprecation")
350    String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS);
351    if (balancerClassName == null) {
352      balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
353        LoadBalancerFactory.getDefaultLoadBalancerClass(), LoadBalancer.class);
354    } else {
355      try {
356        balancerClass = Class.forName(balancerClassName).asSubclass(LoadBalancer.class);
357      } catch (ClassNotFoundException e) {
358        throw new IOException(e);
359      }
360    }
361    this.provider = new MasterClusterInfoProvider(masterServices);
362    // avoid infinite nesting
363    if (getClass().isAssignableFrom(balancerClass)) {
364      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
365    }
366    internalBalancer = ReflectionUtils.newInstance(balancerClass);
367    internalBalancer.setClusterInfoProvider(provider);
368    // special handling for favor node balancers
369    if (internalBalancer instanceof FavoredNodesPromoter) {
370      favoredNodesManager = new FavoredNodesManager(provider);
371      ((FavoredNodesPromoter) internalBalancer).setFavoredNodesManager(favoredNodesManager);
372    }
373    internalBalancer.initialize();
374    // init fallback groups
375    this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
376  }
377
378  public boolean isOnline() {
379    if (this.rsGroupInfoManager == null) {
380      return false;
381    }
382
383    return this.rsGroupInfoManager.isOnline();
384  }
385
386  public boolean isFallbackEnabled() {
387    return fallbackEnabled;
388  }
389
390  @Override
391  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
392  }
393
394  @Override
395  public void regionOffline(RegionInfo regionInfo) {
396  }
397
398  @Override
399  public synchronized void onConfigurationChange(Configuration conf) {
400    boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
401    if (fallbackEnabled != newFallbackEnabled) {
402      LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled,
403        newFallbackEnabled);
404      fallbackEnabled = newFallbackEnabled;
405    }
406    provider.onConfigurationChange(conf);
407    internalBalancer.onConfigurationChange(conf);
408  }
409
410  @Override
411  public void stop(String why) {
412    internalBalancer.stop(why);
413  }
414
415  @Override
416  public boolean isStopped() {
417    return internalBalancer.isStopped();
418  }
419
420  public LoadBalancer getInternalBalancer() {
421    return internalBalancer;
422  }
423
424  public FavoredNodesManager getFavoredNodesManager() {
425    return favoredNodesManager;
426  }
427
428  @Override
429  public synchronized void postMasterStartupInitialize() {
430    this.internalBalancer.postMasterStartupInitialize();
431  }
432
433  public void updateBalancerStatus(boolean status) {
434    internalBalancer.updateBalancerStatus(status);
435  }
436
437  private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
438    List<ServerName> serverNames = null;
439    try {
440      RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
441      serverNames = filterOfflineServers(info, servers);
442    } catch (IOException e) {
443      LOG.error("Failed to get default rsgroup info to fallback", e);
444    }
445    return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
446  }
447
448  @Override
449  public void setClusterInfoProvider(ClusterInfoProvider provider) {
450    throw new UnsupportedOperationException("Just call set master service instead");
451  }
452}