001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.TreeMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicReference;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.constraint.ConstraintException;
040import org.apache.hadoop.hbase.favored.FavoredNodesManager;
041import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
042import org.apache.hadoop.hbase.master.LoadBalancer;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.RegionPlan;
045import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
046import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
047import org.apache.hadoop.hbase.master.balancer.MasterClusterInfoProvider;
048import org.apache.hadoop.hbase.net.Address;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.hbase.util.ReflectionUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
056import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
059
060/**
061 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does
062 * region balance based on a table's group membership.
063 * <p/>
064 * Most assignment methods contain two exclusive code paths: Online - when the group table is online
065 * and Offline - when it is unavailable.
066 * <p/>
067 * During Offline, assignments are assigned based on cached information in zookeeper. If unavailable
068 * (ie bootstrap) then regions are assigned randomly.
069 * <p/>
070 * Once the GROUP table has been assigned, the balancer switches to Online and will then start
071 * providing appropriate assignments for user tables.
072 */
073@InterfaceAudience.Private
074public class RSGroupBasedLoadBalancer implements LoadBalancer {
075  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
076
077  private MasterServices masterServices;
078  private ClusterInfoProvider provider;
079  private FavoredNodesManager favoredNodesManager;
080  private volatile RSGroupInfoManager rsGroupInfoManager;
081  private volatile LoadBalancer internalBalancer;
082
083  /**
084   * Tracks whether a balance run is currently in progress.
085   */
086  private final AtomicBoolean isBalancing = new AtomicBoolean(false);
087
088  /**
089   * Holds a configuration update that arrived while a balance run was in progress.
090   */
091  private AtomicReference<Configuration> pendingConfiguration = new AtomicReference<>();
092
093  /**
094   * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first,
095   * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch
096   * on at the same time, which is relied on to correct misplaced regions
097   */
098  public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
099
100  private volatile boolean fallbackEnabled = false;
101
102  /**
103   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
104   */
105  @InterfaceAudience.Private
106  public RSGroupBasedLoadBalancer() {
107  }
108
109  // must be called after calling initialize
110  @Override
111  public synchronized void updateClusterMetrics(ClusterMetrics sm) {
112    assert internalBalancer != null;
113    internalBalancer.updateClusterMetrics(sm);
114  }
115
116  @Override
117  public synchronized void
118    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
119    internalBalancer.updateBalancerLoadInfo(loadOfAllTable);
120  }
121
122  public void setMasterServices(MasterServices masterServices) {
123    this.masterServices = masterServices;
124  }
125
126  /**
127   * Balance by RSGroup.
128   */
129  @Override
130  public synchronized List<RegionPlan> balanceCluster(
131    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
132    if (!isOnline()) {
133      throw new ConstraintException(
134        RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance");
135    }
136
137    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
138    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>,
139      List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
140    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
141      correctedStateAndRegionPlans.getFirst();
142    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
143    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
144    // Add RegionPlan
145    // for the regions which have been placed according to the region server group assignment
146    // into the movement list
147    try {
148      // For each rsgroup
149      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
150        LOG.debug("Balancing RSGroup={}", rsgroup.getName());
151        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
152        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
153          .entrySet()) {
154          TableName tableName = entry.getKey();
155          RSGroupInfo targetRSGInfo = RSGroupUtil
156            .getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo);
157          if (targetRSGInfo.getName().equals(rsgroup.getName())) {
158            loadOfTablesInGroup.put(tableName, entry.getValue());
159          }
160        }
161        List<RegionPlan> groupPlans = null;
162        if (!loadOfTablesInGroup.isEmpty()) {
163          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
164          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
165        }
166        if (groupPlans != null) {
167          regionPlans.addAll(groupPlans);
168        }
169      }
170    } catch (IOException exp) {
171      LOG.warn("Exception while balancing cluster.", exp);
172      regionPlans.clear();
173    }
174
175    // Return the whole movement list
176    return regionPlans;
177  }
178
179  @Override
180  @NonNull
181  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
182    List<ServerName> servers) throws IOException {
183    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
184    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
185      generateGroupAssignments(regions, servers);
186    for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
187      Map<ServerName, List<RegionInfo>> result =
188        this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond());
189      result.forEach((server, regionInfos) -> assignments
190        .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
191    }
192    return assignments;
193  }
194
195  @Override
196  @NonNull
197  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
198    List<ServerName> servers) throws HBaseIOException {
199    try {
200      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
201      List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
202        generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
203      for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
204        List<RegionInfo> regionList = pair.getFirst();
205        Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
206        regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
207        Map<ServerName, List<RegionInfo>> pairResult =
208          this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
209        pairResult.forEach((server, rs) -> assignments
210          .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
211      }
212      return assignments;
213    } catch (IOException e) {
214      throw new HBaseIOException("Failed to do online retain assignment", e);
215    }
216  }
217
218  @Override
219  public ServerName randomAssignment(RegionInfo region, List<ServerName> servers)
220    throws IOException {
221    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
222      generateGroupAssignments(Lists.newArrayList(region), servers);
223    List<ServerName> filteredServers = pairs.iterator().next().getSecond();
224    return this.internalBalancer.randomAssignment(region, filteredServers);
225  }
226
227  private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
228    List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
229    try {
230      ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
231      ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
232      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
233      for (RegionInfo region : regions) {
234        String groupName =
235          RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
236            .orElse(defaultInfo).getName();
237        regionMap.put(groupName, region);
238      }
239      for (String groupKey : regionMap.keySet()) {
240        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
241        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
242      }
243
244      List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
245      List<RegionInfo> fallbackRegions = Lists.newArrayList();
246      for (String groupKey : regionMap.keySet()) {
247        if (serverMap.get(groupKey).isEmpty()) {
248          fallbackRegions.addAll(regionMap.get(groupKey));
249        } else {
250          result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
251        }
252      }
253      if (!fallbackRegions.isEmpty()) {
254        List<ServerName> candidates = null;
255        if (isFallbackEnabled()) {
256          if (LOG.isDebugEnabled()) {
257            LOG.debug("Falling back {} regions to servers outside their RSGroup. Regions: {}",
258              fallbackRegions.size(), fallbackRegions.stream()
259                .map(RegionInfo::getRegionNameAsString).collect(Collectors.toSet()));
260          }
261          candidates = getFallBackCandidates(servers);
262        }
263        candidates = (candidates == null || candidates.isEmpty())
264          ? Lists.newArrayList(BOGUS_SERVER_NAME)
265          : candidates;
266        result.add(Pair.newPair(fallbackRegions, candidates));
267      }
268      return result;
269    } catch (IOException e) {
270      throw new HBaseIOException("Failed to generate group assignments", e);
271    }
272  }
273
274  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
275    List<ServerName> onlineServers) {
276    if (RSGroupInfo != null) {
277      return filterServers(RSGroupInfo.getServers(), onlineServers);
278    } else {
279      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
280      return Collections.emptyList();
281    }
282  }
283
284  /**
285   * Filter servers based on the online servers.
286   * <p/>
287   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
288   * its contains()'s time complexity as O(logn), which is good enough.
289   * <p/>
290   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
291   * needed.
292   * @param servers       the servers
293   * @param onlineServers List of servers which are online.
294   * @return the list
295   */
296  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
297    ArrayList<ServerName> finalList = new ArrayList<>();
298    for (ServerName onlineServer : onlineServers) {
299      if (servers.contains(onlineServer.getAddress())) {
300        finalList.add(onlineServer);
301      }
302    }
303    return finalList;
304  }
305
306  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
307    correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
308      throws IOException {
309    // To return
310    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
311    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
312    RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
313    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
314      .entrySet()) {
315      TableName tableName = assignments.getKey();
316      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
317      RSGroupInfo targetRSGInfo = null;
318      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
319      try {
320        targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName)
321          .orElse(defaultInfo);
322      } catch (IOException exp) {
323        LOG.debug("RSGroup information null for region of table " + tableName, exp);
324      }
325      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
326        ServerName currentHostServer = serverRegionMap.getKey();
327        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
328        if (
329          targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress())
330        ) {
331          regionInfoList.forEach(regionInfo -> {
332            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
333          });
334        } else {
335          correctServerRegion.put(currentHostServer, regionInfoList);
336        }
337      }
338      correctAssignments.put(tableName, correctServerRegion);
339    }
340
341    // Return correct assignments and region movement plan for mis-placed regions together
342    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
343      correctAssignments, regionPlansForMisplacedRegions);
344  }
345
346  @Override
347  public void initialize() throws IOException {
348    if (rsGroupInfoManager == null) {
349      rsGroupInfoManager = masterServices.getRSGroupInfoManager();
350      if (rsGroupInfoManager == null) {
351        String msg = "RSGroupInfoManager hasn't been initialized";
352        LOG.error(msg);
353        throw new HBaseIOException(msg);
354      }
355      rsGroupInfoManager.start();
356    }
357
358    // Create the balancer
359    Configuration conf = masterServices.getConfiguration();
360    Class<? extends LoadBalancer> balancerClass;
361    @SuppressWarnings("deprecation")
362    String balancerClassName = conf.get(HBASE_RSGROUP_LOADBALANCER_CLASS);
363    if (balancerClassName == null) {
364      balancerClass = conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
365        LoadBalancerFactory.getDefaultLoadBalancerClass(), LoadBalancer.class);
366    } else {
367      try {
368        balancerClass = Class.forName(balancerClassName).asSubclass(LoadBalancer.class);
369      } catch (ClassNotFoundException e) {
370        throw new IOException(e);
371      }
372    }
373    this.provider = new MasterClusterInfoProvider(masterServices);
374    // avoid infinite nesting
375    if (getClass().isAssignableFrom(balancerClass)) {
376      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
377    }
378    internalBalancer = ReflectionUtils.newInstance(balancerClass);
379    internalBalancer.setClusterInfoProvider(provider);
380    // special handling for favor node balancers
381    if (internalBalancer instanceof FavoredNodesPromoter) {
382      favoredNodesManager = new FavoredNodesManager(provider);
383      ((FavoredNodesPromoter) internalBalancer).setFavoredNodesManager(favoredNodesManager);
384    }
385    internalBalancer.initialize();
386    // init fallback groups
387    this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
388  }
389
390  public boolean isOnline() {
391    if (this.rsGroupInfoManager == null) {
392      return false;
393    }
394
395    return this.rsGroupInfoManager.isOnline();
396  }
397
398  public boolean isFallbackEnabled() {
399    return fallbackEnabled;
400  }
401
402  @Override
403  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
404  }
405
406  @Override
407  public void regionOffline(RegionInfo regionInfo) {
408  }
409
410  @Override
411  public void onConfigurationChange(Configuration conf) {
412    // Refer to HBASE-29933
413    synchronized (this) {
414      // If balance is running, store configuration in pendingConfiguration and return immediately.
415      // Defer the config update.
416      if (isBalancing.get()) {
417        LOG.debug(
418          "Balance is in progress, defer applying configuration change until balance completed.");
419        pendingConfiguration.set(conf);
420      } else {
421        // Apply configuration change immediately.
422        updateConfiguration(conf);
423      }
424    }
425  }
426
427  /**
428   * Applies the given configuration.
429   */
430  private void updateConfiguration(Configuration conf) {
431    boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
432    if (fallbackEnabled != newFallbackEnabled) {
433      LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled,
434        newFallbackEnabled);
435      fallbackEnabled = newFallbackEnabled;
436    }
437    provider.onConfigurationChange(conf);
438    internalBalancer.onConfigurationChange(conf);
439  }
440
441  /**
442   * If a pending configuration was stored during a balance run, apply it and clear the pending
443   * reference.
444   */
445  public void applyPendingConfiguration() {
446    Configuration toApply = pendingConfiguration.getAndSet(null);
447    if (toApply != null) {
448      LOG.info("Applying pending configuration after balance completed.");
449      updateConfiguration(toApply);
450    }
451  }
452
453  /**
454   * Sets {@link #isBalancing} to {@code true} before a balance run starts.
455   */
456  @Override
457  public void onBalancingStart() {
458    LOG.debug("setting isBalancing to true as balance is starting");
459    isBalancing.set(true);
460  }
461
462  /**
463   * Sets {@link #isBalancing} to {@code false} after a balance run completes and applies any
464   * pending configuration that arrived during balancing.
465   */
466  @Override
467  public void onBalancingComplete() {
468    LOG.debug("setting isBalancing to false as balance is completed");
469    isBalancing.set(false);
470    applyPendingConfiguration();
471  }
472
473  @Override
474  public void throttle(RegionPlan plan) throws Exception {
475    long throttlingTime = internalBalancer.getThrottleDurationMs(plan);
476    if (throttlingTime > 0) {
477      try {
478        // Release the monitor while waiting to avoid blocking other threads.
479        wait(throttlingTime);
480      } catch (InterruptedException e) {
481        throw new RuntimeException(e);
482      }
483    }
484  }
485
486  @Override
487  public void stop(String why) {
488    internalBalancer.stop(why);
489  }
490
491  @Override
492  public boolean isStopped() {
493    return internalBalancer.isStopped();
494  }
495
496  public LoadBalancer getInternalBalancer() {
497    return internalBalancer;
498  }
499
500  public FavoredNodesManager getFavoredNodesManager() {
501    return favoredNodesManager;
502  }
503
504  @Override
505  public synchronized void postMasterStartupInitialize() {
506    this.internalBalancer.postMasterStartupInitialize();
507  }
508
509  public void updateBalancerStatus(boolean status) {
510    internalBalancer.updateBalancerStatus(status);
511  }
512
513  private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
514    List<ServerName> serverNames = null;
515    try {
516      RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
517      serverNames = filterOfflineServers(info, servers);
518    } catch (IOException e) {
519      LOG.error("Failed to get default rsgroup info to fallback", e);
520    }
521    return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
522  }
523
524  @Override
525  public void setClusterInfoProvider(ClusterInfoProvider provider) {
526    throw new UnsupportedOperationException("Just call set master service instead");
527  }
528}