001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import edu.umd.cs.findbugs.annotations.NonNull;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseIOException;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.constraint.ConstraintException;
038import org.apache.hadoop.hbase.master.LoadBalancer;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.RegionPlan;
041import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
043import org.apache.hadoop.hbase.net.Address;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.hbase.util.ReflectionUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
054
055/**
056 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) It does
057 * region balance based on a table's group membership. Most assignment methods contain two exclusive
058 * code paths: Online - when the group table is online and Offline - when it is unavailable. During
059 * Offline, assignments are assigned based on cached information in zookeeper. If unavailable (ie
060 * bootstrap) then regions are assigned randomly. Once the GROUP table has been assigned, the
061 * balancer switches to Online and will then start providing appropriate assignments for user
062 * tables.
063 */
064@InterfaceAudience.Private
065public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
066  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
067
068  private MasterServices masterServices;
069  private volatile RSGroupInfoManager rsGroupInfoManager;
070  private volatile LoadBalancer internalBalancer;
071
072  /**
073   * Set this key to {@code true} to allow region fallback. Fallback to the default rsgroup first,
074   * then fallback to any group if no online servers in default rsgroup. Please keep balancer switch
075   * on at the same time, which is relied on to correct misplaced regions
076   */
077  public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
078
079  private volatile boolean fallbackEnabled = false;
080
081  /**
082   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
083   */
084  @InterfaceAudience.Private
085  public RSGroupBasedLoadBalancer() {
086  }
087
088  // must be called after calling initialize
089  @Override
090  public synchronized void updateClusterMetrics(ClusterMetrics sm) {
091    assert internalBalancer != null;
092    internalBalancer.updateClusterMetrics(sm);
093  }
094
095  @Override
096  public synchronized void
097    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
098    internalBalancer.updateBalancerLoadInfo(loadOfAllTable);
099  }
100
101  public void setMasterServices(MasterServices masterServices) {
102    this.masterServices = masterServices;
103  }
104
105  @RestrictedApi(explanation = "Should only be called in tests", link = "",
106      allowedOnPath = ".*/src/test/.*")
107  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
108    this.rsGroupInfoManager = rsGroupInfoManager;
109  }
110
111  /**
112   * Balance by RSGroup.
113   */
114  @Override
115  public synchronized List<RegionPlan> balanceCluster(
116    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
117    if (!isOnline()) {
118      throw new ConstraintException(
119        RSGroupInfoManager.RSGROUP_TABLE_NAME + " is not online, unable to perform balance");
120    }
121    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
122    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>,
123      List<RegionPlan>> correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
124    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
125      correctedStateAndRegionPlans.getFirst();
126    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
127    // Add RegionPlan for the regions which have been placed according to the region server group
128    // assignment into the movement list
129    try {
130      // For each rsgroup
131      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
132        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
133        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
134          .entrySet()) {
135          TableName tableName = entry.getKey();
136          String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
137          if (targetRSGroupName == null) {
138            targetRSGroupName = RSGroupInfo.DEFAULT_GROUP;
139          }
140          if (targetRSGroupName.equals(rsgroup.getName())) {
141            loadOfTablesInGroup.put(tableName, entry.getValue());
142          }
143        }
144        List<RegionPlan> groupPlans = null;
145        if (!loadOfTablesInGroup.isEmpty()) {
146          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
147          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
148        }
149        if (groupPlans != null) {
150          regionPlans.addAll(groupPlans);
151        }
152      }
153    } catch (IOException exp) {
154      LOG.warn("Exception while balancing cluster.", exp);
155      regionPlans.clear();
156    }
157    return regionPlans;
158  }
159
160  @Override
161  @NonNull
162  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
163    List<ServerName> servers) throws HBaseIOException {
164    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
165    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
166      generateGroupAssignments(regions, servers);
167    for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
168      Map<ServerName, List<RegionInfo>> result =
169        this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond());
170      result.forEach((server, regionInfos) -> assignments
171        .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
172    }
173    return assignments;
174  }
175
176  @Override
177  @NonNull
178  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
179    List<ServerName> servers) throws HBaseIOException {
180    try {
181      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
182      List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
183        generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
184      for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
185        List<RegionInfo> regionList = pair.getFirst();
186        Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
187        regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
188        Map<ServerName, List<RegionInfo>> pairResult =
189          this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
190        pairResult.forEach((server, rs) -> assignments
191          .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
192      }
193      return assignments;
194    } catch (IOException e) {
195      throw new HBaseIOException("Failed to do online retain assignment", e);
196    }
197  }
198
199  @Override
200  public ServerName randomAssignment(RegionInfo region, List<ServerName> servers)
201    throws HBaseIOException {
202    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
203      generateGroupAssignments(Lists.newArrayList(region), servers);
204    List<ServerName> filteredServers = pairs.iterator().next().getSecond();
205    return this.internalBalancer.randomAssignment(region, filteredServers);
206  }
207
208  private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
209    List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
210    try {
211      ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
212      ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
213      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
214      for (RegionInfo region : regions) {
215        String groupName =
216          Optional.ofNullable(rsGroupInfoManager.getRSGroupOfTable(region.getTable()))
217            .orElse(defaultInfo.getName());
218        regionMap.put(groupName, region);
219      }
220      for (String groupKey : regionMap.keySet()) {
221        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
222        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
223      }
224
225      List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
226      List<RegionInfo> fallbackRegions = Lists.newArrayList();
227      for (String groupKey : regionMap.keySet()) {
228        if (serverMap.get(groupKey).isEmpty()) {
229          fallbackRegions.addAll(regionMap.get(groupKey));
230        } else {
231          result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
232        }
233      }
234      if (!fallbackRegions.isEmpty()) {
235        List<ServerName> candidates = null;
236        if (isFallbackEnabled()) {
237          candidates = getFallBackCandidates(servers);
238        }
239        candidates = (candidates == null || candidates.isEmpty())
240          ? Lists.newArrayList(BOGUS_SERVER_NAME)
241          : candidates;
242        result.add(Pair.newPair(fallbackRegions, candidates));
243      }
244      return result;
245    } catch (IOException e) {
246      throw new HBaseIOException("Failed to generate group assignments", e);
247    }
248  }
249
250  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
251    List<ServerName> onlineServers) {
252    if (RSGroupInfo != null) {
253      return filterServers(RSGroupInfo.getServers(), onlineServers);
254    } else {
255      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
256      return Collections.emptyList();
257    }
258  }
259
260  /**
261   * Filter servers based on the online servers.
262   * <p/>
263   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
264   * its contains()'s time complexity as O(logn), which is good enough.
265   * <p/>
266   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
267   * needed.
268   * @param servers       the servers
269   * @param onlineServers List of servers which are online.
270   * @return the list
271   */
272  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
273    ArrayList<ServerName> finalList = new ArrayList<>();
274    for (ServerName onlineServer : onlineServers) {
275      if (servers.contains(onlineServer.getAddress())) {
276        finalList.add(onlineServer);
277      }
278    }
279    return finalList;
280  }
281
282  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
283    correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
284      throws IOException {
285    // To return
286    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
287    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
288    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
289      .entrySet()) {
290      TableName tableName = assignments.getKey();
291      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
292      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
293      RSGroupInfo targetRSGInfo = null;
294      try {
295        String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
296        if (groupName == null) {
297          LOG.debug("Group not found for table " + tableName + ", using default");
298          groupName = RSGroupInfo.DEFAULT_GROUP;
299        }
300        targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
301      } catch (IOException exp) {
302        LOG.debug("RSGroup information null for region of table " + tableName, exp);
303      }
304      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
305        ServerName currentHostServer = serverRegionMap.getKey();
306        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
307        if (
308          targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress())
309        ) {
310          regionInfoList.forEach(regionInfo -> {
311            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
312          });
313        } else {
314          correctServerRegion.put(currentHostServer, regionInfoList);
315        }
316      }
317      correctAssignments.put(tableName, correctServerRegion);
318    }
319    // Return correct assignments and region movement plan for mis-placed regions together
320    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
321      correctAssignments, regionPlansForMisplacedRegions);
322  }
323
324  @Override
325  public void initialize() throws HBaseIOException {
326    try {
327      if (rsGroupInfoManager == null) {
328        List<RSGroupAdminEndpoint> cps =
329          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
330        if (cps.size() != 1) {
331          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
332          LOG.error(msg);
333          throw new HBaseIOException(msg);
334        }
335        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
336        if (rsGroupInfoManager == null) {
337          String msg = "RSGroupInfoManager hasn't been initialized";
338          LOG.error(msg);
339          throw new HBaseIOException(msg);
340        }
341        rsGroupInfoManager.start();
342      }
343    } catch (IOException e) {
344      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
345    }
346
347    Configuration conf = masterServices.getConfiguration();
348    // Create the balancer
349    Class<? extends LoadBalancer> balancerClass = conf.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
350      StochasticLoadBalancer.class, LoadBalancer.class);
351    if (this.getClass().isAssignableFrom(balancerClass)) {
352      LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, "
353        + "falling back to the default LoadBalancer class");
354      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
355    }
356    internalBalancer = ReflectionUtils.newInstance(balancerClass);
357    internalBalancer.setMasterServices(masterServices);
358    internalBalancer.initialize();
359    // init fallback groups
360    this.fallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
361  }
362
363  public boolean isOnline() {
364    if (this.rsGroupInfoManager == null) {
365      return false;
366    }
367
368    return this.rsGroupInfoManager.isOnline();
369  }
370
371  public boolean isFallbackEnabled() {
372    return fallbackEnabled;
373  }
374
375  @Override
376  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
377  }
378
379  @Override
380  public void regionOffline(RegionInfo regionInfo) {
381  }
382
383  @Override
384  public synchronized void onConfigurationChange(Configuration conf) {
385    boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
386    if (fallbackEnabled != newFallbackEnabled) {
387      LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled,
388        newFallbackEnabled);
389      fallbackEnabled = newFallbackEnabled;
390    }
391    internalBalancer.onConfigurationChange(conf);
392  }
393
394  @Override
395  public void stop(String why) {
396    internalBalancer.stop(why);
397  }
398
399  @Override
400  public boolean isStopped() {
401    return internalBalancer.isStopped();
402  }
403
404  @Override
405  public synchronized void postMasterStartupInitialize() {
406    this.internalBalancer.postMasterStartupInitialize();
407  }
408
409  public void updateBalancerStatus(boolean status) {
410    internalBalancer.updateBalancerStatus(status);
411  }
412
413  private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
414    List<ServerName> serverNames = null;
415    try {
416      RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
417      serverNames = filterOfflineServers(info, servers);
418    } catch (IOException e) {
419      LOG.error("Failed to get default rsgroup info to fallback", e);
420    }
421    return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
422  }
423}